You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/08/16 04:14:59 UTC
hive git commit: HIVE-16990 : REPL LOAD should update last repl ID
only after successful copy of data files. (Sankar Hariappan,
reviewed by Anishek Agarwal, Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master 34b0e07a3 -> 60ae30ad2
HIVE-16990 : REPL LOAD should update last repl ID only after successful copy of data files. (Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/60ae30ad
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/60ae30ad
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/60ae30ad
Branch: refs/heads/master
Commit: 60ae30ad217687cc2ad90c5297d7048a5bbb53b6
Parents: 34b0e07
Author: Sankar Hariappan <ma...@gmail.com>
Authored: Tue Aug 15 21:14:53 2017 -0700
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Aug 15 21:14:53 2017 -0700
----------------------------------------------------------------------
.../hive/ql/parse/TestReplicationScenarios.java | 86 +++++----
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../hadoop/hive/ql/exec/ReplCopyTask.java | 3 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 9 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 77 ++++----
.../ql/parse/ReplicationSemanticAnalyzer.java | 192 +++++++++++--------
.../hadoop/hive/ql/parse/ReplicationSpec.java | 48 ++---
.../dump/BootStrapReplicationSpecFunction.java | 6 +-
.../parse/repl/dump/io/PartitionSerializer.java | 10 +-
.../ql/parse/repl/dump/io/TableSerializer.java | 14 +-
.../parse/repl/load/UpdatedMetaDataTracker.java | 76 ++++++++
.../load/message/AbstractMessageHandler.java | 17 +-
.../repl/load/message/AddForeignKeyHandler.java | 3 +-
.../message/AddNotNullConstraintHandler.java | 3 +-
.../repl/load/message/AddPrimaryKeyHandler.java | 3 +-
.../message/AddUniqueConstraintHandler.java | 3 +-
.../load/message/CreateFunctionHandler.java | 3 +-
.../load/message/DropConstraintHandler.java | 3 +-
.../repl/load/message/DropFunctionHandler.java | 2 +-
.../repl/load/message/DropPartitionHandler.java | 3 +-
.../repl/load/message/DropTableHandler.java | 2 +-
.../parse/repl/load/message/InsertHandler.java | 3 +-
.../parse/repl/load/message/MessageHandler.java | 6 +-
.../load/message/RenamePartitionHandler.java | 3 +-
.../repl/load/message/RenameTableHandler.java | 5 +-
.../parse/repl/load/message/TableHandler.java | 3 +-
.../load/message/TruncatePartitionHandler.java | 3 +-
.../repl/load/message/TruncateTableHandler.java | 3 +-
.../hadoop/hive/ql/plan/AlterDatabaseDesc.java | 2 +-
29 files changed, 340 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b020351..146fb37 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFil
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
@@ -298,7 +297,6 @@ public class TestReplicationScenarios {
String[] unptn_data = new String[]{ "eleven" , "twelve" };
String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
- String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
String[] empty = new String[]{};
String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
@@ -309,7 +307,6 @@ public class TestReplicationScenarios {
createTestDataFile(unptn_locn, unptn_data);
createTestDataFile(ptn_locn_1, ptn_data_1);
createTestDataFile(ptn_locn_2, ptn_data_2);
- createTestDataFile(ptn_locn_2_later, ptn_data_2_later);
run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
run("SELECT * from " + dbName + ".unptned", driver);
@@ -332,15 +329,9 @@ public class TestReplicationScenarios {
// Table dropped after "repl dump"
run("DROP TABLE " + dbName + ".unptned", driver);
+
// Partition droppped after "repl dump"
run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver);
- // File changed after "repl dump"
- Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2");
- Path loc = new Path(p.getSd().getLocation());
- FileSystem fs = loc.getFileSystem(hconf);
- Path file = fs.listStatus(loc)[0].getPath();
- fs.delete(file, false);
- fs.copyFromLocalFile(new Path(ptn_locn_2_later), file);
run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror);
printOutput(driverMirror);
@@ -353,10 +344,8 @@ public class TestReplicationScenarios {
verifyResults(unptn_data, driverMirror);
run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror);
verifyResults(ptn_data_1, driverMirror);
- // Since partition(b=2) changed manually, Hive cannot find
- // it in original location and cmroot, thus empty
run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror);
- verifyResults(empty, driverMirror);
+ verifyResults(ptn_data_2, driverMirror);
run("SELECT a from " + dbName + ".ptned_empty", driverMirror);
verifyResults(empty, driverMirror);
run("SELECT * from " + dbName + ".unptned_empty", driverMirror);
@@ -1281,6 +1270,7 @@ public class TestReplicationScenarios {
String[] ptn_data = new String[]{ "ten"};
run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data[0] + "')", driver);
+ run("DROP TABLE " + dbName + ".ptned", driver);
// Inject a behaviour where it throws exception if an INSERT event is found
// As we dynamically add a partition through INSERT INTO cmd, it should just add ADD_PARTITION
@@ -1300,6 +1290,7 @@ public class TestReplicationScenarios {
if (event.getDbName().equalsIgnoreCase(dbName)) {
if (event.getEventType() == "INSERT") {
// If an insert event is found, then return null hence no event is dumped.
+ LOG.error("Encountered INSERT event when it was not expected to");
return null;
}
}
@@ -1316,7 +1307,7 @@ public class TestReplicationScenarios {
eventTypeValidator.assertInjectionsPerformed(true,false);
InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour
- verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror);
+ verifyIfTableNotExist(replDbName , "ptned", metaStoreClientMirror);
}
@Test
@@ -2546,32 +2537,58 @@ public class TestReplicationScenarios {
// Replicate all the events happened so far
Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName);
- verifySetup("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
- verifySetup("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
}
@Test
- public void testStatus() throws IOException {
- // first test ReplStateMap functionality
- Map<String,Long> cmap = new ReplStateMap<String,Long>();
+ public void testIncrementalLoadFailAndRetry() throws IOException {
+ String testName = "incrementalLoadFailAndRetry";
+ String dbName = createDB(testName, driver);
+ run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
- Long oldV;
- oldV = cmap.put("a",1L);
- assertEquals(1L,cmap.get("a").longValue());
- assertEquals(null,oldV);
+ // Bootstrap dump/load
+ String replDbName = dbName + "_dupe";
+ Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
- cmap.put("b",2L);
- oldV = cmap.put("b",-2L);
- assertEquals(2L, cmap.get("b").longValue());
- assertEquals(2L, oldV.longValue());
+ // Prefixed with incrementalLoadFailAndRetry to avoid finding entry in cmpath
+ String[] ptn_data_1 = new String[] { "incrementalLoadFailAndRetry_fifteen" };
+ String[] empty = new String[] {};
- cmap.put("c",3L);
- oldV = cmap.put("c",33L);
- assertEquals(33L, cmap.get("c").longValue());
- assertEquals(3L, oldV.longValue());
+ run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver);
+ run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver);
- // Now, to actually testing status - first, we bootstrap.
+ // Move the data files of this newly created partition to a temp location
+ Partition ptn = null;
+ try {
+ ptn = metaStoreClient.getPartition(dbName, "ptned", new ArrayList<>(Arrays.asList("1")));
+ } catch (Exception e) {
+ assert(false);
+ }
+
+ Path ptnLoc = new Path(ptn.getSd().getLocation());
+ Path tmpLoc = new Path(TEST_PATH + "/incrementalLoadFailAndRetry");
+ FileSystem dataFs = ptnLoc.getFileSystem(hconf);
+ assert(dataFs.rename(ptnLoc, tmpLoc));
+
+ // Replicate all the events happened so far. It should fail as the data files missing in
+ // original path and not available in CM as well.
+ Tuple incrDump = replDumpDb(dbName, bootstrapDump.lastReplId, null, null);
+ verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'", driverMirror);
+
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
+ verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", driverMirror);
+ // Move the files back to original data location
+ assert(dataFs.rename(tmpLoc, ptnLoc));
+ loadAndVerify(replDbName, incrDump.dumpLocation, incrDump.lastReplId);
+
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror);
+ }
+
+ @Test
+ public void testStatus() throws IOException {
String name = testName.getMethodName();
String dbName = createDB(name, driver);
advanceDumpDir();
@@ -3000,8 +3017,9 @@ public class TestReplicationScenarios {
} catch (AssertionError ae){
LOG.warn("AssertionError:",ae);
throw new RuntimeException(ae);
+ } catch (Exception e) {
+ success = false;
}
-
assertFalse(success);
}
@@ -3034,7 +3052,7 @@ public class TestReplicationScenarios {
boolean success = false;
try {
CommandProcessorResponse ret = myDriver.run(cmd);
- success = (ret.getException() == null);
+ success = ((ret.getException() == null) && (ret.getErrorMessage() == null));
if (!success){
LOG.warn("Error {} : {} running [{}].", ret.getErrorCode(), ret.getErrorMessage(), cmd);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 3d69d19..364db27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3646,7 +3646,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
if (allPartitions == null) {
db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), alterTbl.getEnvironmentContext());
} else {
- db.alterPartitions(tbl.getTableName(), allPartitions, alterTbl.getEnvironmentContext());
+ db.alterPartitions(alterTbl.getOldName(), allPartitions, alterTbl.getEnvironmentContext());
}
// Add constraints if necessary
addConstraints(db, alterTbl);
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 07f9167..54746d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -165,8 +165,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
.getPath();
filePaths.add(f);
} catch (MetaException e) {
- // skip and issue warning for missing file
+ // issue warning for missing file and throw exception
LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot");
+ throw new IOException(e.getMessage());
}
// Note - we need srcFs rather than fs, because it is possible that the _files lists files
// which are from a different filesystem than the fs where the _files file itself was loaded
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 05fc5e4..67a67fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -166,7 +166,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException {
- return getNewReplicationSpec(eventId.toString(), eventId.toString());
+ ReplicationSpec rspec = getNewReplicationSpec(eventId.toString(), eventId.toString());
+ rspec.setIsIncrementalDump(true);
+ return rspec;
}
private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
@@ -251,12 +253,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private ReplicationSpec getNewReplicationSpec() throws TException {
ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set");
rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC()
- .getCurrentNotificationEventId().getEventId()));
+ .getCurrentNotificationEventId().getEventId()));
return rspec;
}
private ReplicationSpec getNewReplicationSpec(String evState, String objState) {
- return new ReplicationSpec(true, false, evState, objState, false, true, true);
+ return new ReplicationSpec(true, false, false, evState, objState,
+ false, true, true);
}
private String getNextDumpDir() {
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 37edd5c..606a414 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -59,10 +59,10 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.DropTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -143,7 +143,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx),
- null, null);
+ null);
} catch (SemanticException e) {
throw e;
@@ -180,7 +180,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
String parsedLocation, String parsedTableName, String parsedDbName,
LinkedHashMap<String, String> parsedPartSpec,
String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x,
- Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated
+ UpdatedMetaDataTracker updatedMetadata
) throws IOException, MetaException, HiveException, URISyntaxException {
// initialize load path
@@ -200,6 +200,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ReplicationSpec replicationSpec = rv.getReplicationSpec();
if (replicationSpec.isNoop()){
// nothing to do here, silently return.
+ x.getLOG().debug("Current update with ID:{} is noop",
+ replicationSpec.getCurrentReplicationState());
return false;
}
@@ -208,11 +210,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// If the parsed statement contained a db.tablename specification, prefer that.
dbname = parsedDbName;
}
- if (dbsUpdated != null){
- dbsUpdated.put(
- dbname,
- Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID)));
- }
// Create table associated with the import
// Executed if relevant, and used to contain all the other details about the table if not.
@@ -223,7 +220,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new HiveException(e);
}
- if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
+ if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
tblDesc.setReplicationSpec(replicationSpec);
}
@@ -242,11 +239,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
tblDesc.setTableName(parsedTableName);
}
- if (tablesUpdated != null){
- tablesUpdated.put(
- dbname + "." + tblDesc.getTableName(),
- Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID)));
- }
List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
Iterable<Partition> partitions = rv.getPartitions();
@@ -305,8 +297,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
createReplImportTasks(
tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, waitOnPrecursor, table,
- fromURI, fs, wh, x);
+ replicationSpec, waitOnPrecursor, table,
+ fromURI, fs, wh, x, updatedMetadata);
}
return tableExists;
}
@@ -357,14 +349,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf());
}
- private static Task<?> dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){
- return TaskFactory.get(new DDLWork(
- x.getInputs(),
- x.getOutputs(),
- new DropTableDesc(table.getTableName(), null, true, true, null)
- ), x.getConf());
- }
-
private static Task<? extends Serializable> alterTableTask(ImportTableDesc tableDesc,
EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) {
tableDesc.setReplaceMode(true);
@@ -790,12 +774,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static void createReplImportTasks(
ImportTableDesc tblDesc,
List<AddPartitionDesc> partitionDescs,
- boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor,
+ ReplicationSpec replicationSpec, boolean waitOnPrecursor,
Table table, URI fromURI, FileSystem fs, Warehouse wh,
- EximUtil.SemanticAnalyzerWrapperContext x)
+ EximUtil.SemanticAnalyzerWrapperContext x,
+ UpdatedMetaDataTracker updatedMetadata)
throws HiveException, URISyntaxException, IOException, MetaException {
- Task dr = null;
WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
// Normally, on import, trying to create a table or a partition in a db that does not yet exist
@@ -813,16 +797,27 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (table != null) {
if (!replicationSpec.allowReplacementInto(table.getParameters())) {
// If the target table exists and is newer or same as current update based on repl.last.id, then just noop it.
+ x.getLOG().info("Table {}.{} is not replaced as it is newer than the update",
+ tblDesc.getDatabaseName(), tblDesc.getTableName());
return;
}
} else {
// If table doesn't exist, allow creating a new one only if the database state is older than the update.
if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb.getParameters()))) {
// If the target table exists and is newer or same as current update based on repl.last.id, then just noop it.
+ x.getLOG().info("Table {}.{} is not created as the database is newer than the update",
+ tblDesc.getDatabaseName(), tblDesc.getTableName());
return;
}
}
+ if (updatedMetadata != null) {
+ updatedMetadata.set(replicationSpec.getReplicationState(),
+ tblDesc.getDatabaseName(),
+ tblDesc.getTableName(),
+ null);
+ }
+
if (tblDesc.getLocation() == null) {
if (!waitOnPrecursor){
tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString());
@@ -847,8 +842,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
behave like a noop or a pure MD alter.
*/
if (table == null) {
- // Either we're dropping and re-creating, or the table didn't exist, and we're creating.
-
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
}
@@ -862,20 +855,17 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
+ if (updatedMetadata != null) {
+ updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ }
}
} else {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x));
}
}
- if (dr == null){
- // Simply create
- x.getTasks().add(t);
- } else {
- // Drop and recreate
- dr.addDependentTask(t);
- x.getTasks().add(dr);
- }
+ // Simply create
+ x.getTasks().add(t);
} else {
// Table existed, and is okay to replicate into, not dropping and re-creating.
if (table.isPartitioned()) {
@@ -889,6 +879,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
+ if (updatedMetadata != null) {
+ updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ }
}
} else {
// If replicating, then the partition already existing means we need to replace, maybe, if
@@ -901,14 +894,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getTasks().add(alterSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
}
+ if (updatedMetadata != null) {
+ updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ }
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
}
- } else {
- // ignore this ptn, do nothing, not an error.
}
}
-
}
if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){
// MD-ONLY table alter
@@ -919,9 +912,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
} else {
x.getLOG().debug("table non-partitioned");
- if (!replicationSpec.allowReplacementInto(table.getParameters())){
- return; // silently return, table is newer than our replacement.
- }
if (!replicationSpec.isMetadataOnly()) {
// repl-imports are replace-into unless the event is insert-into
loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x);
@@ -934,7 +924,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
x.getOutputs().add(new WriteEntity(table,lockType));
-
}
public static boolean isPartitioned(ImportTableDesc tblDesc) {
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index d4fc340..3e2c513 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
@@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -328,9 +331,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
int evstage = 0;
int evIter = 0;
- Long lastEvid = null;
- Map<String,Long> dbsUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
- Map<String,Long> tablesUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL",
(null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
@@ -360,8 +360,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
String locn = dir.getPath().toUri().toString();
DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
- dbNameOrPattern, tblNameOrPattern, locn, taskChainTail,
- dbsUpdated, tablesUpdated, eventDmd);
+ dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd);
evIter++;
REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " +
"with ID: {}, Type: {}, Path: {}",
@@ -380,80 +379,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
taskChainTail = barrierTask;
evstage++;
- lastEvid = dmd.getEventTo();
}
}
-
- // Now, we need to update repl.last.id for the various parent objects that were updated.
- // This update logic will work differently based on what "level" REPL LOAD was run on.
- // a) If this was a REPL LOAD at a table level, i.e. both dbNameOrPattern and
- // tblNameOrPattern were specified, then the table is the only thing we should
- // update the repl.last.id for.
- // b) If this was a db-level REPL LOAD, then we should update the db, as well as any
- // tables affected by partition level operations. (any table level ops will
- // automatically be updated as the table gets updated. Note - renames will need
- // careful handling.
- // c) If this was a wh-level REPL LOAD, then we should update every db for which there
- // were events occurring, as well as tables for which there were ptn-level ops
- // happened. Again, renames must be taken care of.
- //
- // So, what we're going to do is have each event load update dbsUpdated and tablesUpdated
- // accordingly, but ignore updates to tablesUpdated & dbsUpdated in the case of a
- // table-level REPL LOAD, using only the table itself. In the case of a db-level REPL
- // LOAD, we ignore dbsUpdated, but inject our own, and do not ignore tblsUpdated.
- // And for wh-level, we do no special processing, and use all of dbsUpdated and
- // tblsUpdated as-is.
-
- // Additional Note - although this var says "dbNameOrPattern", on REPL LOAD side,
- // we do not support a pattern It can be null or empty, in which case
- // we re-use the existing name from the dump, or it can be specified,
- // in which case we honour it. However, having this be a pattern is an error.
- // Ditto for tblNameOrPattern.
-
-
- if (evstage > 0){
- if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){
- // if tblNameOrPattern is specified, then dbNameOrPattern will be too, and
- // thus, this is a table-level REPL LOAD - only table needs updating.
- // If any of the individual events logged any other dbs as having changed,
- // null them out.
- dbsUpdated.clear();
- tablesUpdated.clear();
- tablesUpdated.put(dbNameOrPattern + "." + tblNameOrPattern, lastEvid);
- } else if ((dbNameOrPattern != null) && (!dbNameOrPattern.isEmpty())){
- // if dbNameOrPattern is specified and tblNameOrPattern isn't, this is a
- // db-level update, and thus, the database needs updating. In addition.
- dbsUpdated.clear();
- dbsUpdated.put(dbNameOrPattern, lastEvid);
- }
- }
-
- for (String tableName : tablesUpdated.keySet()){
- // weird - AlterTableDesc requires a HashMap to update props instead of a Map.
- HashMap<String, String> mapProp = new HashMap<>();
- String eventId = tablesUpdated.get(tableName).toString();
-
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId);
- AlterTableDesc alterTblDesc = new AlterTableDesc(
- AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(eventId, eventId));
- alterTblDesc.setProps(mapProp);
- alterTblDesc.setOldName(tableName);
- Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
- new DDLWork(inputs, outputs, alterTblDesc), conf);
- taskChainTail.addDependentTask(updateReplIdTask);
- taskChainTail = updateReplIdTask;
- }
- for (String dbName : dbsUpdated.keySet()){
- Map<String, String> mapProp = new HashMap<>();
- String eventId = dbsUpdated.get(dbName).toString();
-
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId);
- AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(eventId, eventId));
- Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
- new DDLWork(inputs, outputs, alterDbDesc), conf);
- taskChainTail.addDependentTask(updateReplIdTask);
- taskChainTail = updateReplIdTask;
- }
rootTasks.add(evTaskRoot);
REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " +
"(DDL/COPY/MOVE) tasks",
@@ -465,12 +392,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e);
}
-
}
private List<Task<? extends Serializable>> analyzeEventLoad(
- String dbName, String tblName, String location, Task<? extends Serializable> precursor,
- Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, DumpMetaData dmd)
+ String dbName, String tblName,
+ String location,
+ Task<? extends Serializable> precursor,
+ DumpMetaData dmd)
throws SemanticException {
MessageHandler.Context context =
new MessageHandler.Context(dbName, tblName, location, precursor, dmd, conf, db, ctx, LOG);
@@ -484,10 +412,110 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
}
}
- dbsUpdated.putAll(messageHandler.databasesUpdated());
- tablesUpdated.putAll(messageHandler.tablesUpdated());
inputs.addAll(messageHandler.readEntities());
outputs.addAll(messageHandler.writeEntities());
+ return addUpdateReplStateTasks(StringUtils.isEmpty(tblName),
+ messageHandler.getUpdatedMetadata(), tasks);
+ }
+
+ private Task<? extends Serializable> tableUpdateReplStateTask(
+ String dbName,
+ String tableName,
+ Map<String, String> partSpec,
+ String replState,
+ Task<? extends Serializable> preCursor) {
+ HashMap<String, String> mapProp = new HashMap<>();
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+ AlterTableDesc alterTblDesc = new AlterTableDesc(
+ AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState));
+ alterTblDesc.setProps(mapProp);
+ alterTblDesc.setOldName(dbName + "." + tableName);
+ alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
+
+ Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
+ new DDLWork(inputs, outputs, alterTblDesc), conf);
+
+ // Link the update repl state task with dependency collection task
+ if (preCursor != null) {
+ preCursor.addDependentTask(updateReplIdTask);
+ LOG.debug("Added {}:{} as a precursor of {}:{}",
+ preCursor.getClass(), preCursor.getId(),
+ updateReplIdTask.getClass(), updateReplIdTask.getId());
+ }
+ return updateReplIdTask;
+ }
+
+ private Task<? extends Serializable> dbUpdateReplStateTask(
+ String dbName,
+ String replState,
+ Task<? extends Serializable> preCursor) {
+ HashMap<String, String> mapProp = new HashMap<>();
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+ AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(
+ dbName, mapProp, new ReplicationSpec(replState, replState));
+ Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
+ new DDLWork(inputs, outputs, alterDbDesc), conf);
+
+ // Link the update repl state task with dependency collection task
+ if (preCursor != null) {
+ preCursor.addDependentTask(updateReplIdTask);
+ LOG.debug("Added {}:{} as a precursor of {}:{}",
+ preCursor.getClass(), preCursor.getId(),
+ updateReplIdTask.getClass(), updateReplIdTask.getId());
+ }
+ return updateReplIdTask;
+ }
+
+ private List<Task<? extends Serializable>> addUpdateReplStateTasks(
+ boolean isDatabaseLoad,
+ UpdatedMetaDataTracker updatedMetadata,
+ List<Task<? extends Serializable>> importTasks) {
+ String replState = updatedMetadata.getReplicationState();
+ String dbName = updatedMetadata.getDatabase();
+ String tableName = updatedMetadata.getTable();
+
+ // If no import tasks generated by the event or no table updated for table level load, then no
+ // need to update the repl state to any object.
+ if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) {
+ LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load");
+ return importTasks;
+ }
+
+ // Create a barrier task for dependency collection of import tasks
+ Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
+
+ // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
+ for (Task<? extends Serializable> t : importTasks){
+ t.addDependentTask(barrierTask);
+ LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
+ t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
+ }
+
+ List<Task<? extends Serializable>> tasks = new ArrayList<>();
+ Task<? extends Serializable> updateReplIdTask;
+
+ // If any partition is updated, then update repl state in partition object
+ for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
+ updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ if (tableName != null) {
+ // If any table/partition is updated, then update repl state in table object
+ updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ // For table level load, need not update replication state for the database
+ if (isDatabaseLoad) {
+ // If any table/partition is updated, then update repl state in db object
+ updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ // At least one task would have been added to update the repl state
return tasks;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 4badea6..1c54d29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
import javax.annotation.Nullable;
import java.text.Collator;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -39,6 +38,7 @@ public class ReplicationSpec {
private boolean isInReplicationScope = false; // default is that it's not in a repl scope
private boolean isMetadataOnly = false; // default is full export/import, not metadata-only
+ private boolean isIncrementalDump = false; // default is replv2 bootstrap dump or replv1 export or import/load.
private String eventId = null;
private String currStateId = null;
private boolean isNoop = false;
@@ -71,34 +71,6 @@ public class ReplicationSpec {
static private Collator collator = Collator.getInstance();
/**
- * Class that extends HashMap with a slightly different put semantic, where
- * put behaves as follows:
- * a) If the key does not already exist, then retains existing HashMap.put behaviour
- * b) If the map already contains an entry for the given key, then will replace only
- * if the new value is "greater" than the old value.
- *
- * The primary goal for this is to track repl updates for dbs and tables, to replace state
- * only if the state is newer.
- */
- public static class ReplStateMap<K,V extends Comparable> extends HashMap<K,V> {
- @Override
- public V put(K k, V v){
- if (!containsKey(k)){
- return super.put(k,v);
- }
- V oldValue = get(k);
- if (v.compareTo(oldValue) > 0){
- return super.put(k,v);
- }
- // we did no replacement, but return the old value anyway. This
- // seems most consistent with HashMap behaviour, becuse the "put"
- // was effectively processed and consumed, although we threw away
- // the enw value.
- return oldValue;
- }
- }
-
- /**
* Constructor to construct spec based on either the ASTNode that
* corresponds to the replication clause itself, or corresponds to
* the parent node, and will scan through the children to instantiate
@@ -134,14 +106,16 @@ public class ReplicationSpec {
}
public ReplicationSpec(String fromId, String toId) {
- this(true, false, fromId, toId, false, true, false);
+ this(true, false, false, fromId, toId, false, true, false);
}
public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
+ boolean isIncrementalDump,
String eventReplicationState, String currentReplicationState,
boolean isNoop, boolean isLazy, boolean isReplace) {
this.isInReplicationScope = isInReplicationScope;
this.isMetadataOnly = isMetadataOnly;
+ this.isIncrementalDump = isIncrementalDump;
this.eventId = eventReplicationState;
this.currStateId = currentReplicationState;
this.isNoop = isNoop;
@@ -151,8 +125,9 @@ public class ReplicationSpec {
public ReplicationSpec(Function<String, String> keyFetcher) {
String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
- this.isMetadataOnly = false;
this.isInReplicationScope = false;
+ this.isMetadataOnly = false;
+ this.isIncrementalDump = false;
if (scope != null) {
if (scope.equalsIgnoreCase("metadata")) {
this.isMetadataOnly = true;
@@ -258,6 +233,17 @@ public class ReplicationSpec {
}
/**
+ * @return true if this statement refers to incremental dump operation.
+ */
+ public boolean isIncrementalDump(){
+ return isIncrementalDump;
+ }
+
+ public void setIsIncrementalDump(boolean isIncrementalDump){
+ this.isIncrementalDump = isIncrementalDump;
+ }
+
+ /**
* @return true if this statement refers to metadata-only operation.
*/
public boolean isMetadataOnly(){
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
index ae37c73..6a05ea4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
@@ -32,18 +32,20 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep
@Override
public ReplicationSpec fromMetaStore() throws HiveException {
try {
+ long currentNotificationId = db.getMSC()
+ .getCurrentNotificationEventId().getEventId();
ReplicationSpec replicationSpec =
new ReplicationSpec(
true,
false,
+ false,
"replv2",
"will-be-set",
false,
true,
false
);
- long currentNotificationId = db.getMSC()
- .getCurrentNotificationEventId().getEventId();
+
replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId));
return replicationSpec;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 077d39b..2c7414f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -42,9 +42,13 @@ public class PartitionSerializer implements JsonWriter.Serializer {
TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
try {
if (additionalPropertiesProvider.isInReplicationScope()) {
- partition.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
- additionalPropertiesProvider.getCurrentReplicationState());
+ // Current replication state must be set on the Partition object only for bootstrap dump.
+ // Event replication State will be null in case of bootstrap dump.
+ if (!additionalPropertiesProvider.isIncrementalDump()) {
+ partition.putToParameters(
+ ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ additionalPropertiesProvider.getCurrentReplicationState());
+ }
if (isPartitionExternal()) {
// Replication destination will not be external
partition.putToParameters("EXTERNAL", "FALSE");
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 948cb39..c443e53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -68,17 +68,21 @@ public class TableSerializer implements JsonWriter.Serializer {
private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider)
throws SemanticException, IOException {
if (additionalPropertiesProvider.isInReplicationScope()) {
- table.putToParameters(
- ReplicationSpec.KEY.CURR_STATE_ID.toString(),
- additionalPropertiesProvider.getCurrentReplicationState());
+ // Current replication state must be set on the Table object only for bootstrap dump.
+ // Event replication State will be null in case of bootstrap dump.
+ if (!additionalPropertiesProvider.isIncrementalDump()) {
+ table.putToParameters(
+ ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+ additionalPropertiesProvider.getCurrentReplicationState());
+ }
if (isExternalTable(table)) {
// Replication destination will not be external - override if set
table.putToParameters("EXTERNAL", "FALSE");
- }
+ }
if (isExternalTableType(table)) {
// Replication dest will not be external - override if set
table.setTableType(TableType.MANAGED_TABLE.toString());
- }
+ }
} else {
// ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
// write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
new file mode 100644
index 0000000..5714b21
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Utility class to help track and return the metadata which are updated by repl load
+ */
+public class UpdatedMetaDataTracker {
+ private String replState;
+ private String dbName;
+ private String tableName;
+ private List<Map <String, String>> partitionsList;
+
+ public UpdatedMetaDataTracker() {
+ this.replState = null;
+ this.dbName = null;
+ this.tableName = null;
+ this.partitionsList = new ArrayList<>();
+ }
+
+ public void copyUpdatedMetadata(UpdatedMetaDataTracker other) {
+ this.replState = other.replState;
+ this.dbName = other.dbName;
+ this.tableName = other.tableName;
+ this.partitionsList = other.getPartitions();
+ }
+
+ public void set(String replState, String dbName, String tableName, Map <String, String> partSpec) {
+ this.replState = replState;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ if (partSpec != null) {
+ addPartition(partSpec);
+ }
+ }
+
+ public void addPartition(Map <String, String> partSpec) {
+ partitionsList.add(partSpec);
+ }
+
+ public String getReplicationState() {
+ return replState;
+ }
+
+ public String getDatabase() {
+ return dbName;
+ }
+
+ public String getTable() {
+ return tableName;
+ }
+
+ public List<Map <String, String>> getPartitions() {
+ return partitionsList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
index d6a95bf..5b26681 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java
@@ -21,19 +21,15 @@ import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
abstract class AbstractMessageHandler implements MessageHandler {
final HashSet<ReadEntity> readEntitySet = new HashSet<>();
final HashSet<WriteEntity> writeEntitySet = new HashSet<>();
- final Map<String, Long> tablesUpdated = new HashMap<>(),
- databasesUpdated = new HashMap<>();
+ final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker();
final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer();
@Override
@@ -47,13 +43,6 @@ abstract class AbstractMessageHandler implements MessageHandler {
}
@Override
- public Map<String, Long> tablesUpdated() {
- return tablesUpdated;
- }
-
- @Override
- public Map<String, Long> databasesUpdated() {
- return databasesUpdated;
- }
+ public UpdatedMetaDataTracker getUpdatedMetadata() { return updatedMetadata; }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
index 39697bb..0873c1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java
@@ -64,8 +64,7 @@ public class AddForeignKeyHandler extends AbstractMessageHandler {
List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
tasks.add(addConstraintsTask);
context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(addConstraintsTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
index e2c1d1d..76cbe5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java
@@ -65,8 +65,7 @@ public class AddNotNullConstraintHandler extends AbstractMessageHandler {
List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
tasks.add(addConstraintsTask);
context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(addConstraintsTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
index 7babb6a..aee46da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java
@@ -62,8 +62,7 @@ public class AddPrimaryKeyHandler extends AbstractMessageHandler {
List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
tasks.add(addConstraintsTask);
context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(addConstraintsTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
index e7b404a..f0cb11e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java
@@ -63,8 +63,7 @@ public class AddUniqueConstraintHandler extends AbstractMessageHandler {
List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
tasks.add(addConstraintsTask);
context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(addConstraintsTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
index a6d35cf..3f176aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
@@ -67,7 +67,8 @@ public class CreateFunctionHandler extends AbstractMessageHandler {
// bootstrap.There should be a better way to do this but might required a lot of changes across
// different handlers, unless this is a common pattern that is seen, leaving this here.
if (context.dmd != null) {
- databasesUpdated.put(builder.destinationDbName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), builder.destinationDbName,
+ null, null);
}
readEntitySet.add(toReadEntity(new Path(context.location), context.hiveConf));
if (builder.replCopyTasks.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
index 58aa214..459fac5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java
@@ -44,8 +44,7 @@ public class DropConstraintHandler extends AbstractMessageHandler {
List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
tasks.add(dropConstraintsTask);
context.log.debug("Added drop constrain task : {}:{}", dropConstraintsTask.getId(), actualTblName);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(dropConstraintsTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
index dae300f..fee2bb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java
@@ -43,7 +43,7 @@ public class DropFunctionHandler extends AbstractMessageHandler {
context.log.debug(
"Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName()
);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null);
return Collections.singletonList(dropFunctionTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
index 771400e..5456416 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -58,8 +58,7 @@ public class DropPartitionHandler extends AbstractMessageHandler {
);
context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
dropPtnDesc.getTableName(), msg.getPartitions());
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(dropPtnTask);
} else {
throw new SemanticException(
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
index 3ee3949..d2f5248 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java
@@ -46,7 +46,7 @@ public class DropTableHandler extends AbstractMessageHandler {
context.log.debug(
"Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()
);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null);
return Collections.singletonList(dropTableTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
index 40ed0b2..d412fd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
@@ -40,8 +40,7 @@ public class InsertHandler extends AbstractMessageHandler {
List<Task<? extends Serializable>> tasks = tableHandler.handle(currentContext);
readEntitySet.addAll(tableHandler.readEntities());
writeEntitySet.addAll(tableHandler.writeEntities());
- databasesUpdated.putAll(tableHandler.databasesUpdated);
- tablesUpdated.putAll(tableHandler.tablesUpdated);
+ getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata());
return tasks;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
index 33c716f..8daff6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.slf4j.Logger;
import java.io.Serializable;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -42,9 +42,7 @@ public interface MessageHandler {
Set<WriteEntity> writeEntities();
- Map<String, Long> tablesUpdated();
-
- Map<String, Long> databasesUpdated();
+ UpdatedMetaDataTracker getUpdatedMetadata();
class Context {
final String dbName, tableName, location;
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
index 5bd0532..43f2cbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -65,8 +65,7 @@ public class RenamePartitionHandler extends AbstractMessageHandler {
new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf);
context.log.debug("Added rename ptn task : {}:{}->{}",
renamePtnTask.getId(), oldPartSpec, newPartSpec);
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(tableName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
return Collections.singletonList(renamePtnTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
index 4785e55..e30abad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -65,9 +65,8 @@ public class RenameTableHandler extends AbstractMessageHandler {
renameTableTask.getId(), oldName, newName);
// oldDbName and newDbName *will* be the same if we're here
- databasesUpdated.put(newDbName, context.dmd.getEventTo());
- tablesUpdated.remove(oldName);
- tablesUpdated.put(newName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), newDbName,
+ msg.getTableObjAfter().getTableName(), null);
// Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out
// tablesUpdated. However, we explicitly don't support repl of that sort, and error out above
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
index 65e1d6a..2c5c2d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -47,8 +47,7 @@ public class TableHandler extends AbstractMessageHandler {
// Also, REPL LOAD doesn't support external table and hence no location set as well.
ImportSemanticAnalyzer.prepareImport(false, false, false,
(context.precursor != null), null, context.tableName, context.dbName,
- null, context.location, x,
- databasesUpdated, tablesUpdated);
+ null, context.location, x, updatedMetadata);
return importTasks;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
index 3a8990a..b983f95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java
@@ -62,8 +62,7 @@ public class TruncatePartitionHandler extends AbstractMessageHandler {
context.hiveConf);
context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(),
truncateTableDesc.getTableName());
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec);
return Collections.singletonList(truncatePtnTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
index 93ffa29..c6d7739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java
@@ -44,8 +44,7 @@ public class TruncateTableHandler extends AbstractMessageHandler {
context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(),
truncateTableDesc.getTableName());
- databasesUpdated.put(actualDbName, context.dmd.getEventTo());
- tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);
return Collections.singletonList(truncateTableTask);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/60ae30ad/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
index 7410e5a..ca6f090 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
* AlterDatabaseDesc.
*
*/
-@Explain(displayName = "Create Database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+@Explain(displayName = "Alter Database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public class AlterDatabaseDesc extends DDLDesc implements Serializable {
private static final long serialVersionUID = 1L;