You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/02 17:32:39 UTC

hive git commit: HIVE-15642 : Replicate Insert Overwrites, Dynamic Partition Inserts and Loads (Sankar Hariappan & Vaibhav Gumashta, reviewed by Sushanth Sowmyan)

Repository: hive
Updated Branches:
  refs/heads/master 00b644482 -> c2637e6e1


HIVE-15642 : Replicate Insert Overwrites, Dynamic Partition Inserts and Loads (Sankar Hariappan & Vaibhav Gumashta, reviewed by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c2637e6e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c2637e6e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c2637e6e

Branch: refs/heads/master
Commit: c2637e6e1c7ec6e8bb656788ff597df7b098e1ea
Parents: 00b6444
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Tue May 2 10:31:11 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Tue May 2 10:32:35 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 14 ++-
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 90 ++++++++++++++++----
 2 files changed, 84 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c2637e6e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 36874cd..b3cbae0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -897,6 +897,7 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
 
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
     run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
         + ".ptned PARTITION(b=1)");
     verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
@@ -926,6 +927,8 @@ public class TestReplicationScenarios {
 
     verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
     verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
   }
 
   @Test
@@ -987,8 +990,7 @@ public class TestReplicationScenarios {
 
     verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
 
-    // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
-    //verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
   }
 
   @Test
@@ -1036,8 +1038,12 @@ public class TestReplicationScenarios {
     verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
 
     String[] data_after_ovwrite = new String[] { "hundred" };
+    // Insert overwrite on existing partition
     run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
     verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
+    // Insert overwrite on dynamic partition
+    run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')");
+    verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite);
 
     advanceDumpDir();
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -1049,8 +1055,8 @@ public class TestReplicationScenarios {
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
 
-    // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
-    //verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/c2637e6e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 45c77a2..dec73a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1592,6 +1592,19 @@ public class Hive {
     return getDatabase(currentDb);
   }
 
+  /**
+   * @param loadPath
+   * @param tableName
+   * @param partSpec
+   * @param replace
+   * @param inheritTableSpecs
+   * @param isSkewedStoreAsSubdir
+   * @param isSrcLocal
+   * @param isAcid
+   * @param hasFollowingStatsTask
+   * @return
+   * @throws HiveException
+   */
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
@@ -1620,7 +1633,11 @@ public class Hive {
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
    *          If the source directory is LOCAL
-   * @param isAcid true if this is an ACID operation
+   * @param isAcid
+   *          true if this is an ACID operation
+   * @param hasFollowingStatsTask
+   *          true if there is a following task which updates the stats, so, this method need not update.
+   * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl,
       Map<String, String> partSpec, boolean replace,
@@ -1629,6 +1646,7 @@ public class Hive {
 
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
+      // Get the partition object if it already exists
       Partition oldPart = getPartition(tbl, partSpec, false);
       /**
        * Move files before creating the partition since down stream processes
@@ -1666,15 +1684,19 @@ public class Hive {
       List<Path> newFiles = null;
       PerfLogger perfLogger = SessionState.getPerfLogger();
       perfLogger.PerfLogBegin("MoveTask", "FileMoves");
+
+      // If config is set, table is not temporary and partition being inserted exists, capture
+      // the list of files added. For not yet existing partitions (insert overwrite to new partition
+      // or dynamic partition inserts), the add partition event will capture the list of files added.
+      if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
+        newFiles = Collections.synchronizedList(new ArrayList<Path>());
+      }
+
       if (replace || (oldPart == null && !isAcid)) {
         boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
-            isSrcLocal, isAutoPurge);
+            isSrcLocal, isAutoPurge, newFiles);
       } else {
-        if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) {
-          newFiles = Collections.synchronizedList(new ArrayList<Path>());
-        }
-
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
         Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles);
       }
@@ -1682,13 +1704,17 @@ public class Hive {
       Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
       alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
       validatePartition(newTPart);
-      if ((null != newFiles) || replace) {
+
+      // Generate an insert event only if inserting into an existing partition
+      // When inserting into a new partition, the add partition event takes care of insert event
+      if ((null != oldPart) && (null != newFiles)) {
         fireInsertEvent(tbl, partSpec, replace, newFiles);
       } else {
-        LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
+        LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
+                + "partition that does not exist yet. Skipping generating INSERT event.");
       }
 
-      //column stats will be inaccurate
+      // column stats will be inaccurate
       StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
 
       // recreate the partition if it existed before
@@ -2035,7 +2061,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (replace) {
       Path tableDest = tbl.getPath();
       boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
-      replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge);
+      replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge, newFiles);
     } else {
       FileSystem fs;
       try {
@@ -3109,12 +3135,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
+  // List the new files in destination path which gets copied from source.
+  public static void listNewFilesRecursively(final FileSystem destFs, Path dest,
+                                             List<Path> newFiles) throws HiveException {
+    try {
+      for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+        if (fileStatus.isDirectory()) {
+          // If it is a sub-directory, then recursively list the files.
+          listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles);
+        } else {
+          newFiles.add(fileStatus.getPath());
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get source file statuses", e);
+      throw new HiveException(e.getMessage(), e);
+    }
+  }
+
   //it is assumed that parent directory of the destf should already exist when this
   //method is called. when the replace value is true, this method works a little different
   //from mv command if the destf is a directory, it replaces the destf instead of moving under
   //the destf. in this case, the replaced destf still preserves the original destf's permission
-  public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
-      boolean replace, boolean isSrcLocal) throws HiveException {
+  public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace,
+                                 boolean isSrcLocal) throws HiveException {
     final FileSystem srcFs, destFs;
     try {
       destFs = destf.getFileSystem(conf);
@@ -3125,7 +3169,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     try {
       srcFs = srcf.getFileSystem(conf);
     } catch (IOException e) {
-      LOG.error("Failed to get dest fs", e);
+      LOG.error("Failed to get src fs", e);
       throw new HiveException(e.getMessage(), e);
     }
 
@@ -3406,9 +3450,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          When set to true files which needs to be deleted are not moved to Trash
    * @param isSrcLocal
    *          If the source directory is LOCAL
+   * @param newFiles
+   *          Output the list of new files replaced in the destination path
    */
   protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
-          boolean isSrcLocal, boolean purge) throws HiveException {
+          boolean isSrcLocal, boolean purge, List<Path> newFiles) throws HiveException {
     try {
 
       FileSystem destFs = destf.getFileSystem(conf);
@@ -3478,11 +3524,23 @@ private void constructOneLBLocationMap(FileStatus fSta,
         if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) {
           throw new IOException("Error moving: " + srcf + " into: " + destf);
         }
-      } else { // its either a file or glob
+
+        // Add file paths of the files that will be moved to the destination if the caller needs it
+        if (null != newFiles) {
+          listNewFilesRecursively(destFs, destf, newFiles);
+        }
+      } else {
+        // its either a file or glob
         for (FileStatus src : srcs) {
-          if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) {
+          Path destFile = new Path(destf, src.getPath().getName());
+          if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) {
             throw new IOException("Error moving: " + srcf + " into: " + destf);
           }
+
+          // Add file paths of the files that will be moved to the destination if the caller needs it
+          if (null != newFiles) {
+            newFiles.add(destFile);
+          }
         }
       }
     } catch (IOException e) {