You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2017/05/02 17:32:39 UTC
hive git commit: HIVE-15642 : Replicate Insert Overwrites,
Dynamic Partition Inserts and Loads (Sankar Hariappan & Vaibhav
Gumashta, reviewed by Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master 00b644482 -> c2637e6e1
HIVE-15642 : Replicate Insert Overwrites, Dynamic Partition Inserts and Loads (Sankar Hariappan & Vaibhav Gumashta, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c2637e6e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c2637e6e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c2637e6e
Branch: refs/heads/master
Commit: c2637e6e1c7ec6e8bb656788ff597df7b098e1ea
Parents: 00b6444
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Tue May 2 10:31:11 2017 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Tue May 2 10:32:35 2017 -0700
----------------------------------------------------------------------
.../hive/ql/TestReplicationScenarios.java | 14 ++-
.../apache/hadoop/hive/ql/metadata/Hive.java | 90 ++++++++++++++++----
2 files changed, 84 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c2637e6e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 36874cd..b3cbae0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -897,6 +897,7 @@ public class TestReplicationScenarios {
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data);
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
+ ".ptned PARTITION(b=1)");
verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1);
@@ -926,6 +927,8 @@ public class TestReplicationScenarios {
verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2);
}
@Test
@@ -987,8 +990,7 @@ public class TestReplicationScenarios {
verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins);
- // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
- //verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
+ verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite);
}
@Test
@@ -1036,8 +1038,12 @@ public class TestReplicationScenarios {
verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2);
String[] data_after_ovwrite = new String[] { "hundred" };
+ // Insert overwrite on existing partition
run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')");
verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite);
+ // Insert overwrite on dynamic partition
+ run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')");
+ verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite);
advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
@@ -1049,8 +1055,8 @@ public class TestReplicationScenarios {
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
- // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in
- //verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite);
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/c2637e6e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 45c77a2..dec73a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1592,6 +1592,19 @@ public class Hive {
return getDatabase(currentDb);
}
+ /**
+ * @param loadPath
+ * @param tableName
+ * @param partSpec
+ * @param replace
+ * @param inheritTableSpecs
+ * @param isSkewedStoreAsSubdir
+ * @param isSrcLocal
+ * @param isAcid
+ * @param hasFollowingStatsTask
+ * @return
+ * @throws HiveException
+ */
public void loadPartition(Path loadPath, String tableName,
Map<String, String> partSpec, boolean replace,
boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
@@ -1620,7 +1633,11 @@ public class Hive {
* location/inputformat/outputformat/serde details from table spec
* @param isSrcLocal
* If the source directory is LOCAL
- * @param isAcid true if this is an ACID operation
+ * @param isAcid
+ * true if this is an ACID operation
+ * @param hasFollowingStatsTask
+ * true if there is a following task which updates the stats, so, this method need not update.
+ * @return Partition object being loaded with data
*/
public Partition loadPartition(Path loadPath, Table tbl,
Map<String, String> partSpec, boolean replace,
@@ -1629,6 +1646,7 @@ public class Hive {
Path tblDataLocationPath = tbl.getDataLocation();
try {
+ // Get the partition object if it already exists
Partition oldPart = getPartition(tbl, partSpec, false);
/**
* Move files before creating the partition since down stream processes
@@ -1666,15 +1684,19 @@ public class Hive {
List<Path> newFiles = null;
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", "FileMoves");
+
+ // If config is set, table is not temporary and partition being inserted exists, capture
+ // the list of files added. For not yet existing partitions (insert overwrite to new partition
+ // or dynamic partition inserts), the add partition event will capture the list of files added.
+ if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ }
+
if (replace || (oldPart == null && !isAcid)) {
boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
- isSrcLocal, isAutoPurge);
+ isSrcLocal, isAutoPurge, newFiles);
} else {
- if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) {
- newFiles = Collections.synchronizedList(new ArrayList<Path>());
- }
-
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles);
}
@@ -1682,13 +1704,17 @@ public class Hive {
Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
validatePartition(newTPart);
- if ((null != newFiles) || replace) {
+
+ // Generate an insert event only if inserting into an existing partition
+ // When inserting into a new partition, the add partition event takes care of insert event
+ if ((null != oldPart) && (null != newFiles)) {
fireInsertEvent(tbl, partSpec, replace, newFiles);
} else {
- LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
+ LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
+ + "partition that does not exist yet. Skipping generating INSERT event.");
}
- //column stats will be inaccurate
+ // column stats will be inaccurate
StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
// recreate the partition if it existed before
@@ -2035,7 +2061,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (replace) {
Path tableDest = tbl.getPath();
boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
- replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge);
+ replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge, newFiles);
} else {
FileSystem fs;
try {
@@ -3109,12 +3135,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
+ // List the new files in destination path which gets copied from source.
+ public static void listNewFilesRecursively(final FileSystem destFs, Path dest,
+ List<Path> newFiles) throws HiveException {
+ try {
+ for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+ if (fileStatus.isDirectory()) {
+ // If it is a sub-directory, then recursively list the files.
+ listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles);
+ } else {
+ newFiles.add(fileStatus.getPath());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get source file statuses", e);
+ throw new HiveException(e.getMessage(), e);
+ }
+ }
+
//it is assumed that parent directory of the destf should already exist when this
//method is called. when the replace value is true, this method works a little different
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
- public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
- boolean replace, boolean isSrcLocal) throws HiveException {
+ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace,
+ boolean isSrcLocal) throws HiveException {
final FileSystem srcFs, destFs;
try {
destFs = destf.getFileSystem(conf);
@@ -3125,7 +3169,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
try {
srcFs = srcf.getFileSystem(conf);
} catch (IOException e) {
- LOG.error("Failed to get dest fs", e);
+ LOG.error("Failed to get src fs", e);
throw new HiveException(e.getMessage(), e);
}
@@ -3406,9 +3450,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
* When set to true files which needs to be deleted are not moved to Trash
* @param isSrcLocal
* If the source directory is LOCAL
+ * @param newFiles
+ * Output the list of new files replaced in the destination path
*/
protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
- boolean isSrcLocal, boolean purge) throws HiveException {
+ boolean isSrcLocal, boolean purge, List<Path> newFiles) throws HiveException {
try {
FileSystem destFs = destf.getFileSystem(conf);
@@ -3478,11 +3524,23 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}
- } else { // its either a file or glob
+
+ // Add file paths of the files that will be moved to the destination if the caller needs it
+ if (null != newFiles) {
+ listNewFilesRecursively(destFs, destf, newFiles);
+ }
+ } else {
+ // its either a file or glob
for (FileStatus src : srcs) {
- if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) {
+ Path destFile = new Path(destf, src.getPath().getName());
+ if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}
+
+ // Add file paths of the files that will be moved to the destination if the caller needs it
+ if (null != newFiles) {
+ newFiles.add(destFile);
+ }
}
}
} catch (IOException e) {