You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/03/09 03:27:12 UTC
[hive] branch master updated: HIVE-22954:Schedule Repl Load using
Hive Scheduler (Aasha Medhi,
reviewed by Anishek Agarwal / Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3b83836 HIVE-22954:Schedule Repl Load using Hive Scheduler (Aasha Medhi, reviewed by Anishek Agarwal / Pravin Kumar Sinha)
3b83836 is described below
commit 3b838362d3ba6300f30e830f02dd219e7005a3d7
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Mar 9 08:56:58 2020 +0530
HIVE-22954:Schedule Repl Load using Hive Scheduler (Aasha Medhi, reviewed by Anishek Agarwal / Pravin Kumar Sinha)
---
.../ql/parse/BaseReplicationAcrossInstances.java | 2 +
.../parse/BaseReplicationScenariosAcidTables.java | 3 +
.../hadoop/hive/ql/parse/ReplicationTestUtils.java | 4 +-
.../apache/hadoop/hive/ql/parse/TestCopyUtils.java | 6 +-
.../ql/parse/TestMetaStoreEventListenerInRepl.java | 13 +-
.../ql/parse/TestReplicationOfHiveStreaming.java | 61 ++---
.../parse/TestReplicationOnHDFSEncryptedZones.java | 9 +-
.../hive/ql/parse/TestReplicationScenarios.java | 170 +++++++------
.../parse/TestReplicationScenariosAcidTables.java | 69 +++---
...estReplicationScenariosAcidTablesBootstrap.java | 32 +--
.../TestReplicationScenariosAcrossInstances.java | 268 ++++++---------------
.../TestReplicationScenariosExternalTables.java | 78 +++---
...icationScenariosExternalTablesMetaDataOnly.java | 46 ++--
...licationScenariosIncrementalLoadAcidTables.java | 10 +-
.../parse/TestReplicationWithTableMigration.java | 51 ++--
.../parse/TestReplicationWithTableMigrationEx.java | 52 ++--
.../parse/TestScheduledReplicationScenarios.java | 52 ++--
.../ql/parse/TestStatsReplicationScenarios.java | 34 +--
.../parse/TestTableLevelReplicationScenarios.java | 16 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 34 ++-
.../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 12 +-
.../org/apache/hive/jdbc/TestJdbcWithMiniHS2.java | 5 +-
.../org/apache/hadoop/hive/ql/parse/HiveParser.g | 14 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 27 ++-
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 150 ++++++++----
.../hadoop/hive/ql/parse/TestParseUtils.java | 2 +-
.../ql/parse/TestReplicationSemanticAnalyzer.java | 82 +------
.../clientnegative/repl_dump_requires_admin.q | 2 +-
.../clientnegative/repl_load_requires_admin.q | 4 +-
.../queries/clientpositive/repl_load_old_version.q | 10 -
.../clientnegative/repl_load_requires_admin.q.out | 3 +-
31 files changed, 595 insertions(+), 726 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
index d321cca..b805b19 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.shims.Utils;
import org.junit.After;
import org.junit.AfterClass;
@@ -57,6 +58,7 @@ public class BaseReplicationAcrossInstances {
}};
localOverrides.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ localOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
index cb6bd2d..38580c1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.DriverFactory;
@@ -87,6 +88,7 @@ public class BaseReplicationScenariosAcidTables {
acidEnableConf.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+ acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
@@ -94,6 +96,7 @@ public class BaseReplicationScenariosAcidTables {
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
put("hive.metastore.client.capability.check", "false");
}};
+ overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
index 936acc4..a82bbad 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
@@ -265,11 +265,11 @@ public class ReplicationTestUtils {
List<String> selectStmtList,
List<String[]> expectedValues, String lastReplId) throws Throwable {
WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplica(replica, replicatedDbName, selectStmtList, expectedValues);
- replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplica(replica, replicatedDbName, selectStmtList, expectedValues);
return incrementalDump;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
index 77a0d33..9648c72 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
@@ -95,6 +96,7 @@ public class TestCopyUtils {
put(ConfVars.HIVE_DISTCP_DOAS_USER.varname, currentUser);
}};
primary = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf);
+ overridesForHiveConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf);
}
@@ -121,7 +123,7 @@ public class TestCopyUtils {
*/
@Test
public void testPrivilegedDistCpWithSameUserAsCurrentDoesNotTryToImpersonate() throws Throwable {
- WarehouseInstance.Tuple tuple = primary
+ primary
.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("insert into t1 values (1),(2),(3)")
@@ -132,7 +134,7 @@ public class TestCopyUtils {
We have to do a comparision on the data of table t1 in replicated database because even though the file
copy will fail due to impersonation failure the driver will return a success code 0. May be something to look at later
*/
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("select * from " + replicatedDbName + ".t1")
.verifyResults(Arrays.asList("1", "2", "3", "12", "11", "13"));
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
index 7b0f634..703d16f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
@@ -84,6 +84,7 @@ public class TestMetaStoreEventListenerInRepl {
}};
primary = new WarehouseInstance(LOG, miniDFSCluster, conf);
+ conf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, conf);
}
@@ -172,26 +173,26 @@ public class TestMetaStoreEventListenerInRepl {
@Test
public void testReplEvents() throws Throwable {
Map<String, Set<String>> eventsMap = prepareBootstrapData(primaryDbName);
- WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
ReplMetaStoreEventListenerTestImpl.clearSanityData();
eventsMap = prepareIncData(primaryDbName);
LOG.info(testName.getMethodName() + ": first incremental dump and load.");
- WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, incDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
ReplMetaStoreEventListenerTestImpl.clearSanityData();
// Second incremental, after bootstrap
eventsMap = prepareInc2Data(primaryDbName);
LOG.info(testName.getMethodName() + ": second incremental dump and load.");
- WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
ReplMetaStoreEventListenerTestImpl.clearSanityData();
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java
index 5c8f902..62457b3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java
@@ -91,6 +91,7 @@ public class TestReplicationOfHiveStreaming {
acidEnableConf.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+ acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
}
@@ -116,8 +117,8 @@ public class TestReplicationOfHiveStreaming {
@Test
public void testHiveStreamingUnpartitionedWithTxnBatchSizeAsOne() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
// Create an ACID table.
String tblName = "alerts";
@@ -148,8 +149,8 @@ public class TestReplicationOfHiveStreaming {
connection.commitTransaction();
// Replicate the committed data which should be visible.
- WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " order by msg")
.verifyResults((new String[] {"val1", "val2"}));
@@ -160,8 +161,8 @@ public class TestReplicationOfHiveStreaming {
connection.write("4,val4".getBytes());
// Replicate events before committing txn. The uncommitted data shouldn't be seen.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " order by msg")
.verifyResults((new String[] {"val1", "val2"}));
@@ -169,8 +170,8 @@ public class TestReplicationOfHiveStreaming {
connection.commitTransaction();
// After commit, the data should be replicated and visible.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " order by msg")
.verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -182,8 +183,8 @@ public class TestReplicationOfHiveStreaming {
connection.abortTransaction();
// Aborted data shouldn't be visible.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " order by msg")
.verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -194,8 +195,8 @@ public class TestReplicationOfHiveStreaming {
@Test
public void testHiveStreamingStaticPartitionWithTxnBatchSizeAsOne() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
// Create an ACID table.
String tblName = "alerts";
@@ -233,8 +234,8 @@ public class TestReplicationOfHiveStreaming {
connection.commitTransaction();
// Replicate the committed data which should be visible.
- WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val1", "val2"}));
@@ -245,8 +246,8 @@ public class TestReplicationOfHiveStreaming {
connection.write("4,val4".getBytes());
// Replicate events before committing txn. The uncommitted data shouldn't be seen.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val1", "val2"}));
@@ -254,8 +255,8 @@ public class TestReplicationOfHiveStreaming {
connection.commitTransaction();
// After commit, the data should be replicated and visible.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -267,8 +268,8 @@ public class TestReplicationOfHiveStreaming {
connection.abortTransaction();
// Aborted data shouldn't be visible.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -279,8 +280,8 @@ public class TestReplicationOfHiveStreaming {
@Test
public void testHiveStreamingDynamicPartitionWithTxnBatchSizeAsOne() throws Throwable {
- WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
// Create an ACID table.
String tblName = "alerts";
@@ -315,8 +316,8 @@ public class TestReplicationOfHiveStreaming {
connection.commitTransaction();
// Replicate the committed data which should be visible.
- WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='China' order by msg")
.verifyResults((new String[] {"val11"}))
@@ -329,8 +330,8 @@ public class TestReplicationOfHiveStreaming {
connection.write("14,val14,Asia,India".getBytes());
// Replicate events before committing txn. The uncommitted data shouldn't be seen.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val12"}));
@@ -338,8 +339,8 @@ public class TestReplicationOfHiveStreaming {
connection.commitTransaction();
// After committing the txn, the data should be visible.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val12", "val14"}))
@@ -353,8 +354,8 @@ public class TestReplicationOfHiveStreaming {
connection.abortTransaction();
// Aborted data should not be visible.
- incrDump = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
.verifyResults((new String[] {"val12", "val14"}))
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
index 0a69d63..f6a33bc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -99,6 +99,7 @@ public class TestReplicationOnHDFSEncryptedZones {
put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
+ put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir);
}}, "test_key123");
WarehouseInstance.Tuple tuple =
@@ -109,8 +110,8 @@ public class TestReplicationOnHDFSEncryptedZones {
.dump(primaryDbName);
replica
- .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
- + "' with('hive.repl.add.raw.reserved.namespace'='true', "
+ .run("repl load " + primaryDbName + " into " + replicatedDbName
+ + " with('hive.repl.add.raw.reserved.namespace'='true', "
+ "'hive.repl.replica.external.table.base.dir'='" + replica.externalTableWarehouseRoot + "', "
+ "'distcp.options.pugpbx'='', 'distcp.options.skipcrccheck'='')")
.run("use " + replicatedDbName)
@@ -140,8 +141,8 @@ public class TestReplicationOnHDFSEncryptedZones {
.dump(primaryDbName);
replica
- .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
- + "' with('hive.repl.add.raw.reserved.namespace'='true')")
+ .run("repl load " + primaryDbName + " into " + replicatedDbName
+ + " with('hive.repl.add.raw.reserved.namespace'='true')")
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 003533a..b709ce7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -253,7 +253,7 @@ public class TestReplicationScenarios {
private Tuple incrementalLoadAndVerify(String dbName, String replDbName) throws IOException {
Tuple dump = replDumpDb(dbName);
- loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId);
+ loadAndVerify(replDbName, dbName, dump.lastReplId);
return dump;
}
@@ -267,8 +267,8 @@ public class TestReplicationScenarios {
return new Tuple(dumpLocation, lastReplId);
}
- private void loadAndVerify(String replDbName, String dumpLocation, String lastReplId) throws IOException {
- run("REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'", driverMirror);
+ private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId) throws IOException {
+ run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName, driverMirror);
verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror);
return;
}
@@ -393,7 +393,7 @@ public class TestReplicationScenarios {
assertEquals(false, hasMoveTask(task));
assertEquals(true, hasPartitionTask(task));
- loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId);
+ loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver);
dump = replDumpDb(dbName);
@@ -404,7 +404,7 @@ public class TestReplicationScenarios {
assertEquals(true, hasMoveTask(task));
assertEquals(true, hasPartitionTask(task));
- loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId);
+ loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver);
dump = replDumpDb(dbName);
@@ -458,7 +458,7 @@ public class TestReplicationScenarios {
// Partition droppped after "repl dump"
run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver);
- run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("REPL STATUS " + replDbName, new String[] {replDumpId}, driverMirror);
verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
@@ -495,7 +495,7 @@ public class TestReplicationScenarios {
createDB(dbName + "_withtable", driverMirror);
run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE", driverMirror);
// Load using same dump to a DB with table. It should fail as DB is not empty.
- verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'", driverMirror);
+ verifyFail("REPL LOAD " + dbName + " INTO " + dbName + "_withtable ", driverMirror);
// REPL STATUS should return NULL
verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId, driverMirror);
@@ -505,7 +505,7 @@ public class TestReplicationScenarios {
run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE", driverMirror);
run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned", driverMirror);
// Load using same dump to a DB with view. It should fail as DB is not empty.
- verifyFail("REPL LOAD " + dbName + "_withview FROM '" + replDumpLocn + "'", driverMirror);
+ verifyFail("REPL LOAD " + dbName + " INTO " + dbName + "_withview", driverMirror);
// REPL STATUS should return NULL
verifyRun("REPL STATUS " + dbName + "_withview", nullReplId, driverMirror);
@@ -566,7 +566,7 @@ public class TestReplicationScenarios {
String replDumpLocn = getResult(0, 0, driver);
String replDumpId = getResult(0, 1, true, driver);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
- run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
// The ptned table should miss in target as the table was marked virtually as dropped
verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
@@ -581,7 +581,7 @@ public class TestReplicationScenarios {
String postDropReplDumpLocn = getResult(0,0, driver);
String postDropReplDumpId = getResult(0,1,true,driver);
LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
- assert(run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", true, driverMirror));
+ assert(run("REPL LOAD " + dbName + " INTO " + replDbName, true, driverMirror));
verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
verifyIfTableNotExist(replDbName, "ptned", metaStoreClientMirror);
@@ -633,7 +633,7 @@ public class TestReplicationScenarios {
String replDumpLocn = getResult(0, 0, driver);
String replDumpId = getResult(0, 1, true, driver);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
- run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
// All partitions should miss in target as it was marked virtually as dropped
verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", empty, driverMirror);
@@ -650,7 +650,7 @@ public class TestReplicationScenarios {
String postDropReplDumpLocn = getResult(0,0,driver);
String postDropReplDumpId = getResult(0,1,true,driver);
LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
- assert(run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", true, driverMirror));
+ assert(run("REPL LOAD " + dbName + " INTO " + replDbName, true, driverMirror));
verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror);
verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror);
@@ -872,7 +872,6 @@ public class TestReplicationScenarios {
Tuple incrementalDump = replDumpDb(dbName);
String incrementalDumpLocn = incrementalDump.dumpLocation;
- replDumpId = incrementalDump.lastReplId;
// Rename the event directories such a way that the length varies.
// We will encounter create_table, truncate followed by insert.
@@ -906,7 +905,7 @@ public class TestReplicationScenarios {
}
// Load from modified dump event directories.
- run("REPL LOAD " + replDbName + " FROM '" + incrementalDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
}
@@ -1110,7 +1109,7 @@ public class TestReplicationScenarios {
// Drop partition after dump
run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')", driver);
- run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
Exception e = null;
try {
@@ -1710,7 +1709,7 @@ public class TestReplicationScenarios {
verifyFail("SELECT * FROM " + dbName + ".unptned_tmp", driver);
// Dump all the events except DROP
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
// Need to find the tables and data as drop is not part of this dump
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
@@ -1760,7 +1759,7 @@ public class TestReplicationScenarios {
verifyFail("SELECT * FROM " + dbName + ".ptned", driver);
// Replicate all the events except DROP
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
// Need to find the tables and data as drop is not part of this dump
verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -1793,7 +1792,7 @@ public class TestReplicationScenarios {
run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver);
// Replicate only one INSERT INTO operation on the table.
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
// After Load from this dump, all target tables/partitions will have initial set of data but source will have latest data.
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
@@ -1829,7 +1828,7 @@ public class TestReplicationScenarios {
verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite, driver);
// Replicate only 2 INSERT INTO operations.
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
incrementalDump = replDumpDb(dbName);
// After Load from this dump, all target tables/partitions will have initial set of data.
@@ -1837,7 +1836,7 @@ public class TestReplicationScenarios {
verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
// Replicate the remaining INSERT OVERWRITE operation on the table.
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
// After load, shall see the overwritten data.
verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -1932,7 +1931,7 @@ public class TestReplicationScenarios {
run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_renamed", driver);
run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver);
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -1971,7 +1970,7 @@ public class TestReplicationScenarios {
run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=10)", driver);
- loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned where (b=10) ORDER BY a", empty, driverMirror);
@@ -2339,20 +2338,20 @@ public class TestReplicationScenarios {
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')", driver);
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1, driver);
- run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
// Dump and load only first insert (1 record)
- loadAndVerify(replDbName, firstInsert.dumpLocation, firstInsert.lastReplId);
+ loadAndVerify(replDbName, dbName, firstInsert.lastReplId);
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror);
// Dump and load only second insert (2 records)
- loadAndVerify(replDbName, secondInsert.dumpLocation, secondInsert.lastReplId);
+ loadAndVerify(replDbName, dbName, secondInsert.lastReplId);
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror);
// Dump and load only truncate (0 records)
- loadAndVerify(replDbName, thirdTrunc.dumpLocation, thirdTrunc.lastReplId);
+ loadAndVerify(replDbName, dbName, thirdTrunc.lastReplId);
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror);
// Dump and load insert after truncate (1 record)
@@ -2361,7 +2360,7 @@ public class TestReplicationScenarios {
}
@Test
- public void testIncrementalRepeatEventOnExistingObject() throws IOException {
+ public void testIncrementalRepeatEventOnExistingObject() throws IOException, InterruptedException {
String testName = "incrementalRepeatEventOnExistingObject";
String dbName = createDB(testName, driver);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
@@ -2383,46 +2382,46 @@ public class TestReplicationScenarios {
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver);
Tuple replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// INSERT EVENT to partitioned table with dynamic ADD_PARTITION
run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// ADD_PARTITION EVENT to partitioned table
run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// INSERT EVENT to partitioned table on existing partition
run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// TRUNCATE_PARTITION EVENT on partitioned table
run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// TRUNCATE_TABLE EVENT on unpartitioned table
run("TRUNCATE TABLE " + dbName + ".unptned", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// CREATE_TABLE EVENT with multiple partitions
run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// ADD_CONSTRAINT EVENT
run("ALTER TABLE " + dbName + ".unptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// Replicate all the events happened so far
for (Tuple currDump : incrementalDumpList) {
// Load the incremental dump and ensure it does nothing and lastReplID remains same
- loadAndVerify(replDbName, currDump.dumpLocation, currDump.lastReplId);
+ loadAndVerify(replDbName, dbName, currDump.lastReplId);
}
Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName);
@@ -2432,18 +2431,15 @@ public class TestReplicationScenarios {
verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty, driverMirror);
verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror);
- // Load each incremental dump from the list. Each dump have only one operation.
- for (Tuple currDump : incrementalDumpList) {
- // Load the incremental dump and ensure it does nothing and lastReplID remains same
- loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId);
+ // Load the incremental dump and ensure it does nothing and lastReplID remains same
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
- // Verify if the data are intact even after applying an applied event once again on existing objects
- verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror);
- verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
- verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
- verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty, driverMirror);
- verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror);
- }
+ // Verify if the data are intact even after applying an applied event once again on existing objects
+ verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty, driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror);
}
@Test
@@ -2468,76 +2464,76 @@ public class TestReplicationScenarios {
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver);
Tuple replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// INSERT EVENT to partitioned table with dynamic ADD_PARTITION
run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// ADD_PARTITION EVENT to partitioned table
run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// INSERT EVENT to partitioned table on existing partition
run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// TRUNCATE_PARTITION EVENT on partitioned table
run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// TRUNCATE_TABLE EVENT on unpartitioned table
run("TRUNCATE TABLE " + dbName + ".unptned", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// CREATE_TABLE EVENT on partitioned table
run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// INSERT EVENT to partitioned table with dynamic ADD_PARTITION
run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// INSERT EVENT to partitioned table with dynamic ADD_PARTITION
run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// DROP_PARTITION EVENT to partitioned table
run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// RENAME_PARTITION EVENT to partitioned table
run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// RENAME_TABLE EVENT to unpartitioned table
run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// ADD_CONSTRAINT EVENT
run("ALTER TABLE " + dbName + ".ptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// DROP_TABLE EVENT to partitioned table
run("DROP TABLE " + dbName + ".ptned_tmp", driver);
replDump = replDumpDb(dbName);
incrementalDumpList.add(replDump);
-
+ Thread.sleep(1000);
// Load each incremental dump from the list. Each dump have only one operation.
for (Tuple currDump : incrementalDumpList) {
// Load the current incremental dump and ensure it does nothing and lastReplID remains same
- loadAndVerify(replDbName, currDump.dumpLocation, currDump.lastReplId);
+ loadAndVerify(replDbName, dbName, currDump.lastReplId);
}
// Replicate all the events happened so far
Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName);
@@ -2551,19 +2547,17 @@ public class TestReplicationScenarios {
verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20")), metaStoreClientMirror);
// Load each incremental dump from the list. Each dump have only one operation.
- for (Tuple currDump : incrementalDumpList) {
- // Load the current incremental dump and ensure it does nothing and lastReplID remains same
- loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId);
-
- // Verify if the data are intact even after applying an applied event once again on missing objects
- verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror);
- verifyIfTableNotExist(replDbName, "ptned_tmp", metaStoreClientMirror);
- verifyIfTableExist(replDbName, "unptned_new", metaStoreClientMirror);
- verifyIfTableExist(replDbName, "ptned", metaStoreClientMirror);
- verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror);
- verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror);
- verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20")), metaStoreClientMirror);
- }
+ // Load the current incremental dump and ensure it does nothing and lastReplID remains same
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+
+ // Verify if the data are intact even after applying an applied event once again on missing objects
+ verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror);
+ verifyIfTableNotExist(replDbName, "ptned_tmp", metaStoreClientMirror);
+ verifyIfTableExist(replDbName, "unptned_new", metaStoreClientMirror);
+ verifyIfTableExist(replDbName, "ptned", metaStoreClientMirror);
+ verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror);
+ verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror);
+ verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20")), metaStoreClientMirror);
}
@Test
@@ -2661,14 +2655,14 @@ public class TestReplicationScenarios {
// Replicate all the events happened so far. It should fail as the data files missing in
// original path and not available in CM as well.
Tuple incrDump = replDumpDb(dbName);
- verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'", driverMirror);
+ verifyFail("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", driverMirror);
// Move the files back to original data location
assert(dataFs.rename(tmpLoc, ptnLoc));
- loadAndVerify(replDbName, incrDump.dumpLocation, incrDump.lastReplId);
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -2890,7 +2884,7 @@ public class TestReplicationScenarios {
String replDumpLocn = replDumpDb(dbName).dumpLocation;
// Reset the driver
driverMirror.close();
- run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
// Calling close() explicitly to clean up the staging dirs
driverMirror.close();
// Check result
@@ -2941,7 +2935,7 @@ public class TestReplicationScenarios {
run("TRUNCATE TABLE " + dbName + ".unptned", driver);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
- run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
verifyRun("SELECT count(*) from " + replDbName + ".unptned", new String[]{"2"}, driverMirror);
}
@@ -3147,7 +3141,7 @@ public class TestReplicationScenarios {
fs.delete(path);
try {
- driverMirror.run("REPL LOAD " + dbName + " FROM '" + dumpLocation + "'");
+ driverMirror.run("REPL LOAD " + dbName + " INTO " + dbName);
assert false;
} catch (CommandProcessorException e) {
assertTrue(e.getResponseCode() == ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getErrorCode());
@@ -3287,8 +3281,8 @@ public class TestReplicationScenarios {
String replDbName = dbName + "_replica";
Tuple dump = replDumpDb(dbName);
- run("REPL LOAD " + replDbName + " FROM '" + dump.dumpLocation +
- "' with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName +
+ " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror);
run(" use " + replDbName, driverMirror);
@@ -3308,8 +3302,7 @@ public class TestReplicationScenarios {
String dbName = createDB(testName, driver);
String replDbName = dbName + "_replica";
- Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
- String replDumpId = bootstrapDump.lastReplId;
+ bootstrapLoadAndVerify(dbName, replDbName);
String[] unptn_data = new String[] { "eleven", "twelve" };
@@ -3322,10 +3315,9 @@ public class TestReplicationScenarios {
verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver);
Tuple incrementalDump = replDumpDb(dbName);
- run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation +
- "' with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName +
+ " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
- replDumpId = incrementalDump.lastReplId;
verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror);
@@ -3340,8 +3332,8 @@ public class TestReplicationScenarios {
verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver);
incrementalDump = replDumpDb(dbName);
- run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation +
- "' with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+ run("REPL LOAD " + dbName + " INTO " + replDbName +
+ " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index f3a1abb..2854045 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -51,6 +51,7 @@ import java.util.Collections;
import java.util.Map;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.junit.Assert.assertEquals;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables.
@@ -90,6 +91,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
acidEnableConf.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+ acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
@@ -97,6 +99,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
put("hive.metastore.client.capability.check", "false");
}};
+ overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
@@ -117,7 +120,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
public void testAcidTablesBootstrap() throws Throwable {
// Bootstrap
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
// First incremental, after bootstrap
@@ -126,7 +129,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
LOG.info(testName.getMethodName() + ": first incremental dump and load.");
WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, incDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
// Second incremental, after bootstrap
@@ -135,14 +138,14 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
LOG.info(testName.getMethodName() + ": second incremental dump and load.");
WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
}
@Test
public void testAcidTablesMoveOptimizationBootStrap() throws Throwable {
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, bootstrapDump.dumpLocation,
+ replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
}
@@ -150,10 +153,10 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
@Test
public void testAcidTablesMoveOptimizationIncremental() throws Throwable {
WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrapDump.dumpLocation,
+ replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, incrDump.dumpLocation,
+ replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true);
}
@@ -196,7 +199,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
// Bootstrap load which should also replicate the aborted write ids on both tables.
HiveConf replicaConf = replica.getConf();
- replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"t1", "t2"})
@@ -280,7 +283,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
}
// Bootstrap dump has taken snapshot before concurrent tread performed write. So, it won't see data "2".
- replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(bootstrapDump.lastReplicationId)
@@ -289,7 +292,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
// Incremental should include the concurrent write of data "2" from another txn.
WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
@@ -354,7 +357,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
}
// Bootstrap dump has taken latest list of tables and hence won't see table t1 as it is dropped.
- replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(bootstrapDump.lastReplicationId)
@@ -368,7 +371,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
.run("insert into t1 values(100)")
.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
@@ -380,7 +383,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
public void testOpenTxnEvent() throws Throwable {
String tableName = testName.getMethodName();
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -401,12 +404,12 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
primary.testEventCounts(primaryDbName, lastReplId, null, null, 22);
// Test load
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId);
// Test the idempotent behavior of Open and Commit Txn
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId);
}
@@ -415,7 +418,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
public void testAbortTxnEvent() throws Throwable {
String tableNameFail = testName.getMethodName() + "Fail";
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -429,12 +432,12 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId);
// Test the idempotent behavior of Abort Txn
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId);
}
@@ -443,7 +446,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
public void testTxnEventNonAcid() throws Throwable {
String tableName = testName.getMethodName();
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replicaNonAcid.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replicaNonAcid.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -460,7 +463,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
- replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'")
+ replicaNonAcid.runFailure("REPL LOAD " + primaryDbName + " INTO " + replicatedDbName + "'")
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
}
@@ -499,7 +502,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -527,7 +530,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding just constraints for table t4.
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -604,9 +607,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
String txnStrStart = "START TRANSACTION";
String txnStrCommit = "COMMIT";
- WarehouseInstance.Tuple incrementalDump;
primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
- WarehouseInstance.Tuple bootStrapDump = primary.dump("`*`");
primary.run("use " + primaryDbName)
.run("create database " + dbName1 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
@@ -638,7 +639,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
.verifyResults(resultArray)
.run(txnStrCommit);
- incrementalDump = primary.dump("`*`");
+ primary.dump("`*`");
// Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
// we are not able to create multiple embedded derby instances for two different MetaStore instances.
@@ -646,20 +647,10 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
primary.run("drop database " + dbName1 + " cascade");
primary.run("drop database " + dbName2 + " cascade");
//End of additional steps
-
- replica.loadWithoutExplain("", bootStrapDump.dumpLocation)
- .run("REPL STATUS default")
- .verifyResult(bootStrapDump.lastReplicationId);
-
- replica.loadWithoutExplain("", incrementalDump.dumpLocation)
- .run("REPL STATUS " + dbName1)
- .run("select key from " + dbName1 + "." + tableName + " order by key")
- .verifyResults(resultArray)
- .run("select key from " + dbName2 + "." + tableName + " order by key")
- .verifyResults(resultArray);
-
- replica.run("drop database " + primaryDbName + " cascade");
- replica.run("drop database " + dbName1 + " cascade");
- replica.run("drop database " + dbName2 + " cascade");
+ try {
+ replica.loadWithoutExplain("", "`*`");
+ } catch (SemanticException e) {
+ assertEquals("REPL LOAD * is not supported", e.getMessage());
+ }
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
index f5dd043..36841ba 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
@@ -68,7 +68,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
dumpWithoutAcidClause);
LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
// Take a incremental dump with acid table bootstrap
@@ -77,7 +77,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
LOG.info(testName.getMethodName() + ": incremental dump and load dump with acid table bootstrap.");
WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
.dump(primaryDbName, dumpWithAcidBootstrapClause);
- replica.load(replicatedDbName, incrementalDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyIncLoad(replicatedDbName, incrementalDump.lastReplicationId);
// Ckpt should be set on bootstrapped tables.
replica.verifyIfCkptSetForTables(replicatedDbName, acidTableNames, incrementalDump.dumpLocation);
@@ -90,7 +90,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
"bootstrap.");
WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
}
@@ -99,7 +99,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
dumpWithoutAcidClause);
LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
prepareIncAcidData(primaryDbName);
@@ -129,7 +129,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
try {
LOG.info(testName.getMethodName()
+ ": loading first incremental dump with acid table bootstrap (will fail)");
- replica.loadFailure(replicatedDbName, incDump.dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetAlterTableModifier();
@@ -149,7 +149,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
LOG.info(testName.getMethodName()
+ ": trying to load second incremental dump with wrong bootstrap dump "
+ " specified for cleaning ACID tables. Should fail.");
- replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+ replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
// Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one.
loadWithClause = Collections.singletonList(
@@ -159,7 +159,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
LOG.info(testName.getMethodName()
+ ": trying to load second incremental dump with correct bootstrap dump "
+ "specified for cleaning ACID tables. Should succeed.");
- replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+ replica.load(replicatedDbName, primaryDbName, loadWithClause);
verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
// Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD
@@ -170,7 +170,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
LOG.info(testName.getMethodName()
+ ": trying to load second incremental dump (with acid bootstrap) again."
+ " Should succeed.");
- replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+ replica.load(replicatedDbName, primaryDbName, loadWithClause);
verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
}
@@ -178,7 +178,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
public void retryIncBootstrapAcidFromDifferentDumpWithoutCleanTablesConfig() throws Throwable {
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
dumpWithoutAcidClause);
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
prepareIncAcidData(primaryDbName);
prepareIncNonAcidData(primaryDbName);
@@ -186,10 +186,10 @@ public class TestReplicationScenariosAcidTablesBootstrap
.dump(primaryDbName, dumpWithAcidBootstrapClause);
WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
.dump(primaryDbName, dumpWithAcidBootstrapClause);
- replica.load(replicatedDbName, incDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// Re-bootstrapping from different bootstrap dump without clean tables config should fail.
- replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, Collections.emptyList(),
+ replica.loadFailure(replicatedDbName, primaryDbName, Collections.emptyList(),
ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
}
@@ -199,7 +199,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
dumpWithoutAcidClause);
LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// Open concurrent transactions, create data for incremental and take an incremental dump
// with ACID table bootstrap.
@@ -232,7 +232,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
// tables t1 and t2
HiveConf replicaConf = replica.getConf();
LOG.info(testName.getMethodName() + ": loading incremental dump with ACID bootstrap.");
- replica.load(replicatedDbName, incDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
// Verify if HWM is properly set after REPL LOAD
verifyNextId(tables, replicatedDbName, replicaConf);
@@ -257,7 +257,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
dumpWithoutAcidClause);
LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
- replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// Create incremental data for incremental load with bootstrap of ACID
prepareIncNonAcidData(primaryDbName);
@@ -315,7 +315,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
// write. So concurrent writes won't be dumped.
LOG.info(testName.getMethodName() +
": loading incremental dump containing bootstrapped ACID tables.");
- replica.load(replicatedDbName, incDump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
// Next Incremental should include the concurrent writes
@@ -324,7 +324,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
WarehouseInstance.Tuple inc2Dump = primary.dump(primaryDbName);
LOG.info(testName.getMethodName() +
": loading second normal incremental dump from event id = " + incDump.lastReplicationId);
- replica.load(replicatedDbName, inc2Dump.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index eb8a8995..ff1de9e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -77,7 +77,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
@Test
public void testCreateFunctionIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -89,7 +89,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
@@ -97,7 +97,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
replicatedDbName + ".testFunctionTwo" });
// Test the idempotent behavior of CREATE FUNCTION
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
@@ -141,7 +141,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
"'hive.in.repl.test.files.sorted'='true'");
try {
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -175,7 +175,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding just the function f2
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -196,7 +196,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
+ ".testFunctionAnother as 'hivemall.tools.string.StopwordUDF' "
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -204,14 +204,14 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '%testfunctionanother%'")
.verifyResult(null);
// Test the idempotent behavior of DROP FUNCTION
- replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '%testfunctionanother%'")
@@ -225,7 +225,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
+ "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'");
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".testFunction");
}
@@ -241,7 +241,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".anotherFunction");
@@ -262,7 +262,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
@Test
public void testIncrementalCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -275,7 +275,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
.verifyResult(replicatedDbName + ".anotherFunction");
@@ -354,7 +354,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
List<String> withClause = Collections.singletonList(
"'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
- replica.load(replicatedDbName, tuple.dumpLocation, withClause)
+ replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "t1", "t2", "t3" })
@@ -382,7 +382,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName);
replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
@@ -403,7 +403,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert into table1 values (1,2)")
.dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "acid_table", "table1" })
@@ -422,7 +422,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert into table1 values (1,2)")
.dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "table1", "table2", "table3" })
@@ -440,7 +440,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
"repl dump " + primaryDbName + " with ('hive.repl.dump.metadata.only'='true')"
);
- replica.load(replicatedDbName, incrementalOneTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" })
@@ -458,7 +458,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dumpWithCommand("repl dump " + primaryDbName + " with ('hive.repl.dump.metadata.only'='true')"
);
- replica.load(replicatedDbName, secondIncremental.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "table2", "table3", "table4" })
@@ -496,7 +496,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert into table1 values (1,2)")
.dump(dbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, dbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[]{"table1", "table2", "table3"})
@@ -510,7 +510,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("create table table4 (i int, j int)")
.dump(dbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, dbName)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" })
@@ -563,126 +563,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// Reset ckpt and last repl ID keys to empty set for allowing bootstrap load
replica.run("show databases")
.verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
- .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')")
- .load("", tuple.dumpLocation)
- .run("show databases")
- .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo })
- .run("use " + primaryDbName)
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .run("use " + dbOne)
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .run("use " + dbTwo)
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .verifyReplTargetProperty(primaryDbName)
- .verifyReplTargetProperty(dbOne)
- .verifyReplTargetProperty(dbTwo);
-
- /*
- Start of cleanup
- */
-
- replica.run("drop database " + primaryDbName + " cascade");
- replica.run("drop database " + dbOne + " cascade");
- replica.run("drop database " + dbTwo + " cascade");
-
- /*
- End of cleanup
- */
- }
-
- @Test
- public void testIncrementalDumpOfWarehouse() throws Throwable {
- String randomOne = RandomStringUtils.random(10, true, false);
- String randomTwo = RandomStringUtils.random(10, true, false);
- String dbOne = primaryDbName + randomOne;
- primary.run("alter database default set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')");
- WarehouseInstance.Tuple bootstrapTuple = primary
- .run("use " + primaryDbName)
- .run("create table t1 (i int, j int)")
- .run("create database " + dbOne + " WITH DBPROPERTIES ( '" +
- SOURCE_OF_REPLICATION + "' = '1,2,3')")
- .run("use " + dbOne)
- .run("create table t1 (i int, j int) partitioned by (load_date date) "
- + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
- .dump("`*`", Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
-
- String dbTwo = primaryDbName + randomTwo;
- WarehouseInstance.Tuple incrementalTuple = primary
- .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" +
- SOURCE_OF_REPLICATION + "' = '1,2,3')")
- .run("use " + dbTwo)
- .run("create table t1 (i int, j int)")
- .run("use " + dbOne)
- .run("create table t2 (a int, b int)")
- .dump("`*`", Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
-
- /*
- Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
- we are not able to create multiple embedded derby instances for two different MetaStore instances.
- */
-
- primary.run("drop database " + primaryDbName + " cascade");
- primary.run("drop database " + dbOne + " cascade");
- primary.run("drop database " + dbTwo + " cascade");
-
- /*
- End of additional steps
- */
-
- // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load
- replica.run("show databases")
- .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
- .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')")
- .load("", bootstrapTuple.dumpLocation)
- .run("show databases")
- .verifyResults(new String[] { "default", primaryDbName, dbOne })
- .run("use " + primaryDbName)
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .run("use " + dbOne)
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .verifyReplTargetProperty(primaryDbName)
- .verifyReplTargetProperty(dbOne)
- .verifyReplTargetProperty(dbTwo);
-
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
- assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters()));
-
- replica.load("", incrementalTuple.dumpLocation)
- .run("show databases")
- .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo })
- .run("use " + dbTwo)
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .run("use " + dbOne)
- .run("show tables")
- .verifyResults(new String[] { "t1", "t2" })
- .verifyReplTargetProperty(primaryDbName)
- .verifyReplTargetProperty(dbOne)
- .verifyReplTargetProperty(dbTwo);
-
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters()));
- assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbTwo).getParameters()));
-
- /*
- Start of cleanup
- */
-
- replica.run("drop database " + primaryDbName + " cascade");
- replica.run("drop database " + dbOne + " cascade");
- replica.run("drop database " + dbTwo + " cascade");
-
- /*
- End of cleanup
- */
-
+ .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')");
+ try {
+ replica.load("", "`*`");
+ } catch (SemanticException e) {
+ assertEquals("REPL LOAD * is not supported", e.getMessage());
+ }
}
@Test
@@ -702,7 +588,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName);
// Run load on primary itself
- primary.load(replicatedDbName, bootstrapTuple.dumpLocation, withConfigs)
+ primary.load(replicatedDbName, primaryDbName, withConfigs)
.status(replicatedDbName, withConfigs)
.verifyResult(bootstrapTuple.lastReplicationId);
@@ -727,7 +613,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.emptyList());
// Run load on primary itself
- primary.load(replicatedDbName, incrementalOneTuple.dumpLocation, withConfigs)
+ primary.load(replicatedDbName, primaryDbName, withConfigs)
.status(replicatedDbName, withConfigs)
.verifyResult(incrementalOneTuple.lastReplicationId);
@@ -756,7 +642,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.emptyList());
// Run load on primary itself
- primary.load(replicatedDbName, secondIncremental.dumpLocation, withConfigs)
+ primary.load(replicatedDbName, primaryDbName, withConfigs)
.status(replicatedDbName, withConfigs)
.verifyResult(secondIncremental.lastReplicationId);
@@ -790,7 +676,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
// Bootstrap load in replica
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId);
@@ -815,7 +701,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.emptyList());
// First incremental load
- replica.load(replicatedDbName, firstIncremental.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(firstIncremental.lastReplicationId)
.run("use " + replicatedDbName)
@@ -827,7 +713,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.verifyResults(new String[] {"1"});
// Second incremental load
- replica.load(replicatedDbName, secondIncremental.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(secondIncremental.lastReplicationId)
.run("use " + replicatedDbName)
@@ -845,7 +731,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
// Bootstrap load in replica
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId);
@@ -873,7 +759,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.emptyList());
// First incremental load
- replica.load(replicatedDbName, firstIncremental.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(firstIncremental.lastReplicationId)
.run("use " + replicatedDbName)
@@ -885,7 +771,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.verifyResults(new String[] { "3" });
// Second incremental load
- replica.load(replicatedDbName, secondIncremental.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(secondIncremental.lastReplicationId)
.run("use " + replicatedDbName)
@@ -938,7 +824,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
// Bootstrap load in replica
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -963,13 +849,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
tuple = primary.dump(primaryDbName, Collections.emptyList());
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
@@ -980,26 +866,26 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.emptyList());
// Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
// Bootstrap load from an empty dump directory should return empty load directory error.
tuple = primary.dump("someJunkDB", Collections.emptyList());
try {
- replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation + "'");
+ replica.runCommand("REPL LOAD someJunkDB into someJunkDB");
assert false;
} catch (CommandProcessorException e) {
assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase()));
}
- // Incremental load to non existing db should return database not exist error.
+ // Bootstrap load from an empty dump directory should return empty load directory error. Since we have repl status
+ //check on target
tuple = primary.dump("someJunkDB");
try {
- replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation+"'");
+ replica.runCommand("REPL LOAD someJunkDB into someJunkDB ");
} catch (CommandProcessorException e) {
- assertTrue(e.getMessage().toLowerCase().contains(
- "org.apache.hadoop.hive.ql.ddl.DDLTask. Database does not exist: someJunkDB".toLowerCase()));
+ assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase()));
}
primary.run(" drop database if exists " + testDbName + " cascade");
@@ -1009,7 +895,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
public void testIncrementalDumpMultiIteration() throws Throwable {
WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(bootstrapTuple.lastReplicationId);
@@ -1022,7 +908,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert into table3 partition(country='india') values(3)")
.dump(primaryDbName, Collections.emptyList());
- replica.load(replicatedDbName, incremental.dumpLocation,
+ replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.approx.max.load.tasks'='10'"))
.status(replicatedDbName)
.verifyResult(incremental.lastReplicationId)
@@ -1047,7 +933,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
FileStatus[] fileStatus = fs.listStatus(path);
int numEvents = fileStatus.length - 1; //one is metadata file
- replica.load(replicatedDbName, incremental.dumpLocation,
+ replica.load(replicatedDbName, primaryDbName,
Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"))
.run("use " + replicatedDbName)
.run("show tables")
@@ -1067,7 +953,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName);
// Bootstrap Repl A -> B
- replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuplePrimary.lastReplicationId)
.run("show tblproperties t1('custom.property')")
@@ -1079,12 +965,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// do a empty incremental load to allow dump of replicatedDbName
WarehouseInstance.Tuple temp = primary.dump(primaryDbName, Collections.emptyList());
- replica.load(replicatedDbName, temp.dumpLocation); // first successful incremental load.
+ replica.load(replicatedDbName, primaryDbName); // first successful incremental load.
// Bootstrap Repl B -> C
WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName);
String replDbFromReplica = replicatedDbName + "_dupe";
- replica.load(replDbFromReplica, tupleReplica.dumpLocation)
+ replica.load(replDbFromReplica, replicatedDbName)
.run("use " + replDbFromReplica)
.run("repl status " + replDbFromReplica)
.verifyResult(tupleReplica.lastReplicationId)
@@ -1113,7 +999,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName, Collections.emptyList());
// Incremental Repl A -> B with alters on db/table/partition
- WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, tuplePrimaryInc.dumpLocation)
+ WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, primaryDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuplePrimaryInc.lastReplicationId)
.dump(replicatedDbName, Collections.emptyList());
@@ -1127,7 +1013,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
verifyIfCkptPropMissing(india.getParameters());
// Incremental Repl B -> C with alters on db/table/partition
- replica.load(replDbFromReplica, tupleReplicaInc.dumpLocation)
+ replica.load(replDbFromReplica, replicatedDbName)
.run("use " + replDbFromReplica)
.run("repl status " + replDbFromReplica)
.verifyResult(tupleReplicaInc.lastReplicationId)
@@ -1158,7 +1044,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// Bootstrap Repl A -> B and then export table t1
String path = "hdfs:///tmp/" + replicatedDbName + "/";
String exportPath = "'" + path + "1/'";
- replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuplePrimary.lastReplicationId)
.run("use " + replicatedDbName)
@@ -1199,7 +1085,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert into table t2 partition(country='us') values ('sfo')")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
@@ -1212,12 +1098,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation);
// Retry with same dump with which it was already loaded also fails.
- replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
// Retry from same dump when the database is empty is also not allowed.
replica.run("drop table t1")
.run("drop table t2")
- .loadFailure(replicatedDbName, tuple.dumpLocation);
+ .loadFailure(replicatedDbName, primaryDbName);
}
@Test
@@ -1257,7 +1143,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded.
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
try {
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1293,7 +1179,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1331,7 +1217,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding just foreign key constraints for table t2.
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1382,7 +1268,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// Make sure that there's some order in which the objects are loaded.
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
"'hive.in.repl.test.files.sorted'='true'");
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(null); // reset the behaviour
alterPartitionStub.assertInjectionsPerformed(true, false);
@@ -1414,7 +1300,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
try {
// Retry with same dump with which it was already loaded should resume the bootstrap load.
// This time, it completes by adding remaining partitions and function.
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(false, false);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1441,7 +1327,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.dump(primaryDbName);
testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2",
- "ADD_PARTITION", tuple);
+ "ADD_PARTITION");
}
@Test
@@ -1454,8 +1340,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("create table t1 (place string) partitioned by (country string)")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
- replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName, primaryDbName, withConfigs);
+ replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
@@ -1463,7 +1349,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
.run("insert overwrite table t1 select * from t2")
.dump(primaryDbName, Collections.emptyList());
- testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION", tuple);
+ testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION");
}
@Test
@@ -1471,24 +1357,24 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
String replicatedDbName_CM = replicatedDbName + "_CM";
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("create table t2 (place string) partitioned by (country string)")
.run("ALTER TABLE t2 ADD PARTITION (country='india')")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
- replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName, primaryDbName, withConfigs);
+ replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
- tuple = primary.run("use " + primaryDbName)
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
.run("insert into table t2 partition(country='india') values ('bangalore')")
.dump(primaryDbName, Collections.emptyList());
- testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple);
+ testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT");
}
private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
- String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
+ String tbl, String eventType) throws Throwable {
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
@@ -1510,13 +1396,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicaDb, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicaDb, primaryDbName, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
- replica.load(replicaDb, tuple.dumpLocation, withConfigs);
+ replica.load(replicaDb, primaryDbName, withConfigs);
replica.run("use " + replicaDb)
.run("select country from " + tbl + " where country == 'india'")
@@ -1527,13 +1413,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+ replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
callerVerifier.assertInjectionsPerformed(true, false);
- replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
replica.run("use " + replicatedDbName_CM)
.run("select country from " + tbl + " where country == 'india'")
@@ -1573,7 +1459,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// again from start.
InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetAlterTableModifier();
@@ -1583,7 +1469,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
// is loaded before t2. So that scope is set to table in first iteration for table t1. In the next iteration, it
// loads only remaining partitions of t2, so that the table tracker has no tasks.
List<String> withConfigs = Arrays.asList("'hive.in.repl.test.files.sorted'='true'");
- replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName, primaryDbName, withConfigs);
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 740f229..df304c2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -108,7 +108,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -128,7 +128,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(tuple.dumpLocation, FILE_NAME)));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyFailure(new String[] { "t3" })
@@ -155,7 +155,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
List<String> withClauseOptions = externalTableBasePathWithClause();
- replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
@@ -184,7 +184,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"),
new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyResult("t3")
@@ -250,13 +250,13 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
"'distcp.options.update'=''"
);
- WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("create external table a (i int, j int) "
+ "row format delimited fields terminated by ',' "
+ "location '" + externalTableLocation.toUri() + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 'a'")
.verifyResults(Collections.singletonList("a"))
@@ -271,10 +271,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
outputStream.write("13,21\n".getBytes());
}
- WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
+ primary.run("create table b (i int)")
.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("select i From a")
.verifyResults(new String[] { "1", "13" })
.run("select j from a")
@@ -283,11 +283,11 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// alter table location to something new.
externalTableLocation =
new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
- incrementalTuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("alter table a set location '" + externalTableLocation + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select i From a")
.verifyResults(Collections.emptyList());
@@ -313,7 +313,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
assertExternalFileInfo(Collections.singletonList("t2"),
new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResults(new String[] { "t2" })
@@ -337,7 +337,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
assertExternalFileInfo(Collections.singletonList("t2"),
new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select distinct(country) from t2")
.verifyResults(new String[] { "india", "australia" })
@@ -357,12 +357,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
outputStream.write("paris".getBytes());
}
- tuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation
.toString() + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
.verifyResults(new String[] { "paris" })
@@ -372,11 +372,11 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
String tmpLocation = "/tmp/" + System.nanoTime();
primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777"));
- tuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
.verifyResults(new String[] {})
@@ -391,19 +391,19 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
String tmpLocation2 = "/tmp/" + System.nanoTime() + "_2";
primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777"));
- tuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("insert into table t2 partition(country='france') values ('lyon')")
.run("alter table t2 set location '" + tmpLocation2 + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause);
+ replica.load(replicatedDbName, primaryDbName, loadWithClause);
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
}
@Test
public void externalTableIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t1/");
@@ -433,7 +433,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
}
List<String> loadWithClause = externalTableBasePathWithClause();
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
@@ -454,7 +454,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
tuple = primary.dump(primaryDbName);
assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
@@ -475,7 +475,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("alter table t1 drop partition (country='us')")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("select * From t1")
.verifyResults(new String[] {})
.verifyReplTargetProperty(replicatedDbName);
@@ -507,7 +507,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -547,7 +547,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
tblPath = new Path(dbPath, "t3");
assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -601,7 +601,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("create table t3 as select * from t1")
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(tupleBootstrapWithoutExternal.lastReplicationId)
.run("use " + replicatedDbName)
@@ -639,7 +639,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// In the retry, these half baked tables should be dropped and bootstrap should be successful.
InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause);
+ replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetAlterTableModifier();
@@ -658,7 +658,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// So, REPL LOAD fails.
loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+ tupleBootstrapWithoutExternal.dumpLocation + "'");
- replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause);
+ replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+ tupleBootstrapWithoutExternal.dumpLocation + "'");
@@ -668,7 +668,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// Verify if bootstrapping with same dump is idempotent and return same result
for (int i = 0; i < 2; i++) {
- replica.load(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyFailure(new String[]{"t1"})
@@ -719,7 +719,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into table t1 values (1)")
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause);
+ replica.load(replicatedDbName, primaryDbName, loadWithClause);
// Insert a row into "t1" and create another external table using data from "t1".
primary.run("use " + primaryDbName)
@@ -758,7 +758,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
// inserted data.
- replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select id from t1 order by id")
.verifyResult("1")
@@ -778,14 +778,14 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into table t1 values (2)")
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
// This looks like an empty dump but it has the ALTER TABLE event created by the previous
// dump. We need it here so that the next dump won't have any events.
WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(incTuple.lastReplicationId);
@@ -800,7 +800,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
Long.valueOf(inc2Tuple.lastReplicationId).longValue());
// Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
- replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(inc2Tuple.lastReplicationId);
}
@@ -820,7 +820,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into table t2 values (1)")
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, bootstrapDump.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(bootstrapDump.lastReplicationId)
.run("use " + replicatedDbName)
@@ -833,7 +833,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// This looks like an empty dump but it has the ALTER TABLE event created by the previous
// dump. We need it here so that the next dump won't have any events.
WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, incTuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(incTuple.lastReplicationId);
@@ -843,7 +843,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName)
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(inc2Tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -869,7 +869,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into table t2_constraints partition(country='france') values ('paris')")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -881,7 +881,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.verifyResults(new String[] {"1", "2"})
.verifyReplTargetProperty(replicatedDbName);
- tuple = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("create external table t3_bootstrap (id int)")
.run("insert into table t3_bootstrap values (10)")
.run("insert into table t3_bootstrap values (20)")
@@ -890,7 +890,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
.run("insert into table t4_tables values (20)")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't3_bootstrap'")
.verifyResults(new String[] {"t3_bootstrap"})
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
index e9d0162..bf691f3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
@@ -107,7 +107,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -127,7 +127,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(tuple.dumpLocation, FILE_NAME)));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyFailure(new String[] {"t3"})
@@ -153,7 +153,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
List<String> withClauseOptions = externalTableBasePathWithClause();
- replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
@@ -178,7 +178,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
// verify that the external table info is written correctly for incremental
assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
.verifyResult("t3")
@@ -217,7 +217,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
+ "location '" + externalTableLocation.toUri() + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 'a'")
.verifyResults(Collections.singletonList("a"))
@@ -233,7 +233,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("select i From a")
.verifyResults(new String[] {})
.run("select j from a")
@@ -246,7 +246,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.run("alter table a set location '" + externalTableLocation + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select i From a")
.verifyResults(Collections.emptyList());
@@ -270,7 +270,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't2'")
.verifyResults(new String[] {"t2"})
@@ -291,7 +291,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select distinct(country) from t2")
.verifyResults(new String[] {})
@@ -316,7 +316,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.toString() + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
.verifyResults(new String[] {})
@@ -330,7 +330,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
.verifyResults(new String[] {})
@@ -346,13 +346,13 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.run("alter table t2 set location '" + tmpLocation2 + "'")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause);
+ replica.load(replicatedDbName, primaryDbName, loadWithClause);
}
@Test
public void externalTableIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
Path externalTableLocation =
new Path("/" + testName.getMethodName() + "/t1/");
@@ -382,7 +382,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
}
List<String> loadWithClause = externalTableBasePathWithClause();
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
@@ -403,7 +403,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
tuple = primary.dump(primaryDbName);
assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
@@ -424,7 +424,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.run("alter table t1 drop partition (country='us')")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("select * From t1")
.verifyResults(new String[] {})
.verifyReplTargetProperty(replicatedDbName);
@@ -455,7 +455,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
assertFalse(primary.miniDFSCluster.getFileSystem()
.exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -496,7 +496,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
tblPath = new Path(dbPath, "t3");
assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
@@ -542,7 +542,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.run("insert into table t1 values (1)")
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause);
+ replica.load(replicatedDbName, primaryDbName, loadWithClause);
// Insert a row into "t1" and create another external table using data from "t1".
primary.run("use " + primaryDbName)
@@ -580,7 +580,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
// The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
// inserted data.
- replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.run("use " + replicatedDbName)
.run("select id from t1 order by id")
.verifyResult(null)
@@ -600,14 +600,14 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
.run("insert into table t1 values (2)")
.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.status(replicatedDbName)
.verifyResult(tuple.lastReplicationId);
// This looks like an empty dump but it has the ALTER TABLE event created by the previous
// dump. We need it here so that the next dump won't have any events.
WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause);
- replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(incTuple.lastReplicationId);
@@ -622,7 +622,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
Long.valueOf(inc2Tuple.lastReplicationId).longValue());
// Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
- replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, primaryDbName, loadWithClause)
.status(replicatedDbName)
.verifyResult(inc2Tuple.lastReplicationId);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 15cb985..bcab190 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -85,6 +85,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
acidConfs.putAll(overrides);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
+ acidConfs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
@@ -93,6 +94,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
put("hive.metastore.client.capability.check", "false");
put("hive.stats.autogather", "false");
}};
+ overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
@@ -124,7 +126,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
@Test
public void testAcidTableIncrementalReplication() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
List<String> selectStmtList = new ArrayList<>();
@@ -209,7 +211,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
WarehouseInstance.Tuple incrementalDump;
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
@@ -217,7 +219,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
incrementalDump = primary.dump(primaryDbName);
primary.run("drop table " + primaryDbName + "." + tableName);
- replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableName,
"select count(*) from " + tableName + "_nopart"),
@@ -227,7 +229,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
tableNameMM, null, true, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
incrementalDump = primary.dump(primaryDbName);
primary.run("drop table " + primaryDbName + "." + tableNameMM);
- replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableNameMM,
"select count(*) from " + tableNameMM + "_nopart"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
index 7fa23b1..3eab045 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
@@ -100,7 +100,6 @@ public class TestReplicationWithTableMigration {
put("hive.metastore.disallow.incompatible.col.type.changes", "false");
put("hive.strict.managed.tables", "true");
}};
- replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
put("fs.defaultFS", fs.getUri().toString());
@@ -118,6 +117,8 @@ public class TestReplicationWithTableMigration {
}};
configsForPrimary.putAll(overrideConfigs);
primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+ hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
}
private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
@@ -305,7 +306,7 @@ public class TestReplicationWithTableMigration {
return tablePath;
}
- private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
+ private void loadWithFailureInAddNotification(String tbl) throws Throwable {
BehaviourInjection<CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<CallerArguments, Boolean>() {
@Nullable
@@ -326,7 +327,7 @@ public class TestReplicationWithTableMigration {
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName, dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier();
}
@@ -336,49 +337,49 @@ public class TestReplicationWithTableMigration {
@Test
public void testBootstrapLoadMigrationManagedToAcid() throws Throwable {
WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
}
@Test
public void testIncrementalLoadMigrationManagedToAcid() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
}
@Test
public void testIncrementalLoadMigrationManagedToAcidFailure() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- loadWithFailureInAddNotification("tacid", tuple.dumpLocation);
+ loadWithFailureInAddNotification("tacid");
replica.run("use " + replicatedDbName)
.run("show tables like tacid")
.verifyResult(null);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
}
@Test
public void testIncrementalLoadMigrationManagedToAcidFailurePart() throws Throwable {
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- loadWithFailureInAddNotification("tacidpart", tuple.dumpLocation);
+ loadWithFailureInAddNotification("tacidpart");
replica.run("use " + replicatedDbName)
.run("show tables like tacidpart")
.verifyResult(null);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
}
@Test
public void testIncrementalLoadMigrationManagedToAcidAllOp() throws Throwable {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
List<String> selectStmtList = new ArrayList<>();
@@ -418,13 +419,13 @@ public class TestReplicationWithTableMigration {
.run("insert into avro_tbl partition (country='india') values ('another', 13)")
.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrap.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
Path dataLocation = assertTablePath(replicatedDbName, "avro_tbl");
WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
.run("drop table avro_tbl")
.dump(primaryDbName);
- replica.load(replicatedDbName, incremental.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// After drop, the external table data location should be auto deleted as it is converted one.
assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
@@ -433,15 +434,15 @@ public class TestReplicationWithTableMigration {
@Test
public void testIncConvertedExternalTableAutoDeleteDataDirOnDrop() throws Throwable {
WarehouseInstance.Tuple bootstrap = primary.dump(primaryDbName);
- replica.load(replicatedDbName, bootstrap.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
- WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("create table avro_tbl ROW FORMAT SERDE "
+ "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro "
+ "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')")
.run("insert into avro_tbl values ('str', 13)")
.dump(primaryDbName);
- replica.load(replicatedDbName, incremental.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// Data location is valid and is under default external warehouse directory.
Table avroTable = replica.getTable(replicatedDbName, "avro_tbl");
@@ -449,10 +450,10 @@ public class TestReplicationWithTableMigration {
Path dataLocation = new Path(avroTable.getSd().getLocation());
assertTrue(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
- incremental = primary.run("use " + primaryDbName)
+ primary.run("use " + primaryDbName)
.run("drop table avro_tbl")
.dump(primaryDbName);
- replica.load(replicatedDbName, incremental.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// After drop, the external table data location should be auto deleted as it is converted one.
assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
@@ -463,7 +464,7 @@ public class TestReplicationWithTableMigration {
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
- replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName, primaryDbName, withConfigs);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
}
@@ -472,9 +473,9 @@ public class TestReplicationWithTableMigration {
List<String> withConfigs =
Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
- replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName, primaryDbName, withConfigs);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
}
@@ -514,7 +515,7 @@ public class TestReplicationWithTableMigration {
.run("create table texternal (id int) ")
.run("insert into texternal values (1)")
.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
@@ -558,7 +559,7 @@ public class TestReplicationWithTableMigration {
withConfigs.add("'hive.repl.include.external.tables'='true'");
withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'");
tuple = primary.dump(primaryDbName, withConfigs);
- replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+ replica.load(replicatedDbName, primaryDbName, withConfigs);
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
index 425c8cb..1b4833c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
@@ -80,7 +81,6 @@ public class TestReplicationWithTableMigrationEx {
put("hive.strict.managed.tables", "true");
put("hive.metastore.transactional.event.listeners", "");
}};
- replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
put("fs.defaultFS", fs.getUri().toString());
@@ -96,6 +96,8 @@ public class TestReplicationWithTableMigrationEx {
}};
configsForPrimary.putAll(overrideConfigs);
primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+ hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
}
@AfterClass
@@ -186,13 +188,13 @@ public class TestReplicationWithTableMigrationEx {
// dump with operation after last repl id is fetched.
WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
// next incremental dump
tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
}
@@ -203,13 +205,13 @@ public class TestReplicationWithTableMigrationEx {
// dump with operation after last repl id is fetched.
WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
// next incremental dump
tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
}
@@ -221,13 +223,13 @@ public class TestReplicationWithTableMigrationEx {
.run("create table t1 (i int, j int)")
.dump(primaryDbName+".'t1'");
replica.run("create database " + replicatedDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
tuple = primary.run("use " + primaryDbName)
.run("insert into t1 values (1, 2)")
.dump(primaryDbName+".'t1'");
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
}
@@ -243,7 +245,7 @@ public class TestReplicationWithTableMigrationEx {
// dump with operation after last repl id is fetched.
WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(2);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
replica.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"tacid"})
@@ -257,7 +259,7 @@ public class TestReplicationWithTableMigrationEx {
// next incremental dump
tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
replica.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"tacid"})
@@ -270,7 +272,7 @@ public class TestReplicationWithTableMigrationEx {
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
}
- private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
+ private void loadWithFailureInAddNotification(String tbl) throws Throwable {
BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean> callerVerifier
= new BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean>() {
@Nullable
@@ -294,7 +296,7 @@ public class TestReplicationWithTableMigrationEx {
try {
List<String> withClause = Collections.singletonList("'hive.metastore.transactional.event.listeners'='"
+ DbNotificationListener.class.getCanonicalName() + "'");
- replica.loadFailure(replicatedDbName, dumpLocation, withClause);
+ replica.loadFailure(replicatedDbName, primaryDbName, withClause);
} finally {
InjectableBehaviourObjectStore.resetCallerVerifier();
}
@@ -307,7 +309,7 @@ public class TestReplicationWithTableMigrationEx {
// dump with operation after last repl id is fetched.
WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
@@ -317,15 +319,15 @@ public class TestReplicationWithTableMigrationEx {
.run("create table tbl_temp (fld int)")
.dump(primaryDbName);
- loadWithFailureInAddNotification("tbl_temp", tuple.dumpLocation);
+ loadWithFailureInAddNotification("tbl_temp");
Database replDb = replica.getDatabase(replicatedDbName);
assertTrue(ReplUtils.isFirstIncPending(replDb.getParameters()));
assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
assertTrue(replDb.getParameters().get("dummy_key").equalsIgnoreCase("dummy_val"));
// next incremental dump
- tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
}
@@ -337,13 +339,13 @@ public class TestReplicationWithTableMigrationEx {
// dump with operation after last repl id is fetched.
WarehouseInstance.Tuple tuple = dumpWithLastEventIdHacked(4);
- replica.load(replicatedDbName, tuple.dumpLocation, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
// next incremental dump
tuple = primary.dump(primaryDbName);
- replica.load(replicatedDbName, tuple.dumpLocation, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
}
@@ -382,21 +384,21 @@ public class TestReplicationWithTableMigrationEx {
// test bootstrap
alterUserName("hive");
- WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyUserName("hive");
// test incremental
alterUserName("hive1");
- tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyUserName("hive1");
}
@Test
public void testOnwerPropagationInc() throws Throwable {
- WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
primary.run("use " + primaryDbName)
.run("create table tbl_own (fld int)")
@@ -409,8 +411,8 @@ public class TestReplicationWithTableMigrationEx {
// test incremental when table is getting created in the same load
alterUserName("hive");
- tuple = primary.dump(primaryDbName);
- replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+ primary.dump(primaryDbName);
+ replica.loadWithoutExplain(replicatedDbName, primaryDbName);
verifyUserName("hive");
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
index c51bec1..afb53b8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -76,16 +75,9 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
}};
acidEnableConf.putAll(overrides);
-
primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+ acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
- Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
- put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
- put("hive.support.concurrency", "false");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- put("hive.metastore.client.capability.check", "false");
- }};
- replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
}
@Before
@@ -97,16 +89,15 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
public void tearDown() throws Throwable {
primary.run("drop database if exists " + primaryDbName + " cascade");
replica.run("drop database if exists " + replicatedDbName + " cascade");
- replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade");
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}
@Test
- public void testAcidTablesBootstrapIncr() throws Throwable {
+ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable {
// Bootstrap
primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
- "tblproperties (\"transactional\"=\"true\")")
+ "tblproperties (\"transactional\"=\"true\")")
.run("insert into t1 values(1)")
.run("insert into t1 values(2)");
try (ScheduledQueryExecutionService schqS =
@@ -116,36 +107,53 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
primary.run("create scheduled query s1 every 10 minutes as repl dump " + primaryDbName);
primary.run("alter scheduled query s1 execute");
Thread.sleep(6000);
- Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), primaryDbName.toLowerCase());
- Path currdumpRoot = new Path(dumpRoot, String.valueOf(next));
- replica.load(replicatedDbName, currdumpRoot.toString());
+ replica.run("create scheduled query s2 every 10 minutes as repl load " + primaryDbName + " INTO "
+ + replicatedDbName);
+ replica.run("alter scheduled query s2 execute");
+ Thread.sleep(20000);
replica.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1 order by id")
.verifyResults(new String[]{"1", "2"});
- // First incremental, after bootstrap
-
+ // First incremental, after bootstrap
primary.run("use " + primaryDbName)
- .run("insert into t1 values(3)")
- .run("insert into t1 values(4)");
+ .run("insert into t1 values(3)")
+ .run("insert into t1 values(4)");
next++;
ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
primary.run("alter scheduled query s1 execute");
Thread.sleep(20000);
- Path incrdumpRoot = new Path(dumpRoot, String.valueOf(next));
- replica.load(replicatedDbName, incrdumpRoot.toString());
+ replica.run("alter scheduled query s2 execute");
+ Thread.sleep(20000);
+ replica.run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("select id from t1 order by id")
+ .verifyResults(new String[]{"1", "2", "3", "4"});
+
+ // Second incremental
+ primary.run("use " + primaryDbName)
+ .run("insert into t1 values(5)")
+ .run("insert into t1 values(6)");
+ next++;
+ ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
+ primary.run("alter scheduled query s1 execute");
+ Thread.sleep(30000);
+ replica.run("alter scheduled query s2 execute");
+ Thread.sleep(30000);
replica.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1 order by id")
- .verifyResults(new String[]{"1", "2", "3", "4"})
+ .verifyResults(new String[]{"1", "2", "3", "4", "5", "6"})
.run("drop table t1");
} finally {
primary.run("drop scheduled query s1");
+ replica.run("drop scheduled query s2");
}
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
index 44a3805..b2733d1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -97,20 +97,21 @@ public class TestStatsReplicationScenarios {
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
}};
- Map<String, String> overrides = new HashMap<>();
+ Map<String, String> replicatedOverrides = new HashMap<>();
- overrides.putAll(additionalOverrides);
- overrides.putAll(replicaOverrides);
- replica = new WarehouseInstance(LOG, miniDFSCluster, overrides);
+ replicatedOverrides.putAll(additionalOverrides);
+ replicatedOverrides.putAll(replicaOverrides);
// Run with autogather false on primary if requested
+ Map<String, String> sourceOverrides = new HashMap<>();
hasAutogather = autogather;
additionalOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
autogather ? "true" : "false");
- overrides.clear();
- overrides.putAll(additionalOverrides);
- overrides.putAll(primaryOverrides);
- primary = new WarehouseInstance(LOG, miniDFSCluster, overrides);
+ sourceOverrides.putAll(additionalOverrides);
+ sourceOverrides.putAll(primaryOverrides);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, sourceOverrides);
+ replicatedOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, replicatedOverrides);
// Use transactional tables
acidTableKindToUse = acidTableKind;
@@ -330,14 +331,14 @@ public class TestStatsReplicationScenarios {
// checkpoint for a table in the middle of list of tables.
if (failRetry) {
if (lastReplicationId == null) {
- failBootstrapLoad(dumpTuple, tableNames.size()/2);
+ failBootstrapLoad(tableNames.size()/2);
} else {
- failIncrementalLoad(dumpTuple);
+ failIncrementalLoad();
}
}
// Load, possibly a retry
- replica.load(replicatedDbName, dumpTuple.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
// Metadata load may not load all the events.
if (!metadataOnly) {
@@ -363,9 +364,8 @@ public class TestStatsReplicationScenarios {
/**
* Run a bootstrap that will fail.
- * @param tuple the location of bootstrap dump
*/
- private void failBootstrapLoad(WarehouseInstance.Tuple tuple, int failAfterNumTables) throws Throwable {
+ private void failBootstrapLoad(int failAfterNumTables) throws Throwable {
// fail setting ckpt directory property for the second table so that we test the case when
// bootstrap load fails after some but not all tables are loaded.
BehaviourInjection<CallerArguments, Boolean> callerVerifier
@@ -391,14 +391,14 @@ public class TestStatsReplicationScenarios {
InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
callerVerifier.assertInjectionsPerformed(true, false);
} finally {
InjectableBehaviourObjectStore.resetAlterTableModifier();
}
}
- private void failIncrementalLoad(WarehouseInstance.Tuple dumpTuple) throws Throwable {
+ private void failIncrementalLoad() throws Throwable {
// fail add notification when second update table stats event is encountered. Thus we
// test successful application as well as failed application of this event.
BehaviourInjection<NotificationEvent, Boolean> callerVerifier
@@ -421,7 +421,7 @@ public class TestStatsReplicationScenarios {
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
@@ -449,7 +449,7 @@ public class TestStatsReplicationScenarios {
InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
- replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+ replica.loadFailure(replicatedDbName, primaryDbName);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 15b6c3d..0c44100 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -163,7 +163,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
// If the policy contains '.'' means its table level replication.
verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, replPolicy, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(expectedTables)
@@ -207,7 +207,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
// If the policy contains '.'' means its table level replication.
verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, replPolicy, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(expectedTables)
@@ -310,7 +310,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
public void testBasicIncrementalWithIncludeList() throws Throwable {
WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
String[] originalNonAcidTables = new String[] {"t1", "t2"};
String[] originalFullAcidTables = new String[] {"t3", "t4"};
@@ -329,7 +329,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
public void testBasicIncrementalWithIncludeAndExcludeList() throws Throwable {
WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111"};
createTables(originalTables, CreateTableType.NON_ACID);
@@ -373,7 +373,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
String replPolicy = primaryDbName;
WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
.dump(primaryDbName);
- replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+ replica.load(replicatedDbName, primaryDbName);
String lastReplId = tupleBootstrap.lastReplicationId;
for (String oldReplPolicy : invalidReplPolicies) {
failed = false;
@@ -504,7 +504,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"),
new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, replPolicy, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(replicatedTables)
@@ -543,7 +543,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"),
new Path(tuple.dumpLocation, FILE_NAME));
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, replPolicy, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(incrementalReplicatedTables)
@@ -695,7 +695,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
// Verify if the expected tables are bootstrapped.
verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables);
- replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ replica.load(replicatedDbName, replPolicy, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(incrementalReplicatedTables)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index f1eba52..498d59c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -304,21 +304,29 @@ public class WarehouseInstance implements Closeable {
return this;
}
- WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Throwable {
- run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ WarehouseInstance load(String replicatedDbName, String primaryDbName) throws Throwable {
+ StringBuilder replCommand = new StringBuilder("REPL LOAD " + primaryDbName);
+ if (!StringUtils.isEmpty(replicatedDbName)) {
+ replCommand.append(" INTO " + replicatedDbName);
+ }
+ run("EXPLAIN " + replCommand.toString());
printOutput();
- run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ run(replCommand.toString());
return this;
}
- WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable {
- run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "' with ('hive.exec.parallel'='true')");
+ WarehouseInstance loadWithoutExplain(String replicatedDbName, String primaryDbName) throws Throwable {
+ StringBuilder replCommand = new StringBuilder("REPL LOAD " + primaryDbName);
+ if (!StringUtils.isEmpty(replicatedDbName)) {
+ replCommand.append(" INTO " + replicatedDbName);
+ }
+ run(replCommand.toString() + " with ('hive.exec.parallel'='true')");
return this;
}
- WarehouseInstance load(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
+ WarehouseInstance load(String replicatedDbName, String primaryDbName, List<String> withClauseOptions)
throws Throwable {
- String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
+ String replLoadCmd = "REPL LOAD " + primaryDbName + " INTO " + replicatedDbName;
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
@@ -338,23 +346,23 @@ public class WarehouseInstance implements Closeable {
return run(replStatusCmd);
}
- WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable {
- loadFailure(replicatedDbName, dumpLocation, null);
+ WarehouseInstance loadFailure(String replicatedDbName, String primaryDbName) throws Throwable {
+ loadFailure(replicatedDbName, primaryDbName, null);
return this;
}
- WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
+ WarehouseInstance loadFailure(String replicatedDbName, String primaryDbName, List<String> withClauseOptions)
throws Throwable {
- String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
+ String replLoadCmd = "REPL LOAD " + primaryDbName + " INTO " + replicatedDbName;
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
return runFailure(replLoadCmd);
}
- WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions,
+ WarehouseInstance loadFailure(String replicatedDbName, String primaryDbName, List<String> withClauseOptions,
int errorCode) throws Throwable {
- String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
+ String replLoadCmd = "REPL LOAD " + primaryDbName + " INTO " + replicatedDbName;
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 914147f..dbe282d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -2850,8 +2850,6 @@ public class TestJdbcDriver2 {
ResultSet replDumpRslt = stmt.executeQuery("repl dump " + primaryDb +
" with ('hive.repl.rootdir' = '" + replDir + "')");
assertTrue(replDumpRslt.next());
- String dumpLocation = replDumpRslt.getString(1);
- String lastReplId = replDumpRslt.getString(2);
List<String> logs = stmt.getQueryLog(false, 10000);
stmt.close();
LOG.info("Query_Log for Bootstrap Dump");
@@ -2865,7 +2863,8 @@ public class TestJdbcDriver2 {
// Bootstrap load
stmt = (HiveStatement) con.createStatement();
- stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'");
+ stmt.execute("repl load " + primaryDb + " into " + replicaDb +
+ " with ('hive.repl.rootdir' = '" + replDir + "')");
logs = stmt.getQueryLog(false, 10000);
stmt.close();
LOG.info("Query_Log for Bootstrap Load");
@@ -2889,8 +2888,6 @@ public class TestJdbcDriver2 {
replDumpRslt = stmt.executeQuery("repl dump " + primaryDb +
" with ('hive.repl.rootdir' = '" + replDir + "')");
assertTrue(replDumpRslt.next());
- dumpLocation = replDumpRslt.getString(1);
- lastReplId = replDumpRslt.getString(2);
logs = stmt.getQueryLog(false, 10000);
stmt.close();
LOG.info("Query_Log for Incremental Dump");
@@ -2904,7 +2901,8 @@ public class TestJdbcDriver2 {
// Incremental load
stmt = (HiveStatement) con.createStatement();
- stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'");
+ stmt.execute("repl load " + primaryDb + " into " + replicaDb +
+ " with ('hive.repl.rootdir' = '" + replDir + "')");
logs = stmt.getQueryLog(false, 10000);
LOG.info("Query_Log for Incremental Load");
verifyFetchedLog(logs, expectedIncrementalLoadLogs);
@@ -3105,7 +3103,7 @@ public class TestJdbcDriver2 {
try {
// invalid load path
- stmt.execute("repl load default1 from '/tmp/junk'");
+ stmt.execute("repl load default into default1");
} catch(SQLException e){
assertTrue(e.getErrorCode() == ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getErrorCode());
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index a7c905b..7fa6796 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -29,6 +29,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -43,6 +44,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Base64;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@@ -1453,7 +1455,8 @@ public class TestJdbcWithMiniHS2 {
TestJdbcWithMiniHS2.class.getCanonicalName().toLowerCase().replace('.', '_') + "_"
+ System.currentTimeMillis();
String testPathName = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
- Path testPath = new Path(testPathName + Path.SEPARATOR + testDbName);
+ Path testPath = new Path(testPathName + Path.SEPARATOR
+ + Base64.getEncoder().encodeToString(testDbName.toLowerCase().getBytes(StandardCharsets.UTF_8)));
FileSystem fs = testPath.getFileSystem(new HiveConf());
Statement stmt = conDefault.createStatement();
try {
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 949e57b..3dcd60e 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -924,12 +924,8 @@ replDumpStatement
: KW_REPL KW_DUMP
(dbPolicy=replDbPolicy)
(KW_REPLACE oldDbPolicy=replDbPolicy)?
- (KW_FROM (eventId=Number)
- (KW_TO (rangeEnd=Number))?
- (KW_LIMIT (batchSize=Number))?
- )?
(KW_WITH replConf=replConfigs)?
- -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
+ -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? $replConf?)
;
replDbPolicy
@@ -943,10 +939,10 @@ replLoadStatement
@init { pushMsg("Replication load statement", state); }
@after { popMsg(state); }
: KW_REPL KW_LOAD
- (dbName=identifier)?
- KW_FROM (path=StringLiteral)
- (KW_WITH replConf=replConfigs)?
- -> ^(TOK_REPL_LOAD $path ^(TOK_DBNAME $dbName)? $replConf?)
+ (sourceDbPolicy=replDbPolicy)
+ (KW_INTO dbName=identifier)?
+ (KW_WITH replConf=replConfigs)?
+ -> ^(TOK_REPL_LOAD $sourceDbPolicy ^(TOK_DBNAME $dbName)? $replConf?)
;
replConfigs
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index f5eea15..fd06968 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -76,14 +76,14 @@ import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Comparator;
+import java.util.Base64;
import java.util.ArrayList;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
@@ -121,7 +121,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
public int execute() {
try {
Hive hiveDb = getHive();
- Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), work.dbNameOrPattern.toLowerCase());
+ Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+ Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
+ .getBytes(StandardCharsets.UTF_8.name())));
Path currentDumpPath = new Path(dumpRoot, getNextDumpDir());
DumpMetaData dmd = new DumpMetaData(currentDumpPath, conf);
// Initialize ReplChangeManager instance since we will require it to encode file URI.
@@ -147,14 +149,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException {
FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot);
if (statuses.length > 0) {
- //sort based on last modified. Recent one is at the top
- Arrays.sort(statuses, new Comparator<FileStatus>() {
- public int compare(FileStatus f1, FileStatus f2) {
- return Long.compare(f2.getModificationTime(), f1.getModificationTime());
+ FileStatus latestUpdatedStatus = statuses[0];
+ for (FileStatus status : statuses) {
+ if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) {
+ latestUpdatedStatus = status;
}
- });
- FileStatus recentDump = statuses[0];
- DumpMetaData dmd = new DumpMetaData(recentDump.getPath(), conf);
+ }
+ DumpMetaData dmd = new DumpMetaData(latestUpdatedStatus.getPath(), conf);
if (dmd.isIncrementalDump()) {
return dmd.getEventTo();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 2243cb6..703eb11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -44,10 +45,14 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Base64;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
@@ -68,8 +73,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private ReplScope replScope = new ReplScope();
private ReplScope oldReplScope = null;
- // Base path for REPL LOAD
- private String path;
+ // Source DB Name for REPL LOAD
+ private String sourceDbNameOrPattern;
// Added conf member to set the REPL command specific config entries without affecting the configs
// of any other queries running in the session
private HiveConf conf;
@@ -77,6 +82,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again
// if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands.
private Hive db;
+ private boolean isTargetAlreadyLoaded;
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
@@ -280,18 +286,20 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
// REPL LOAD
- private void initReplLoad(ASTNode ast) throws SemanticException {
- path = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ private void initReplLoad(ASTNode ast) throws HiveException {
+ sourceDbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
int numChildren = ast.getChildCount();
for (int i = 1; i < numChildren; i++) {
ASTNode childNode = (ASTNode) ast.getChild(i);
switch (childNode.getToken().getType()) {
- case TOK_DBNAME:
- replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText()));
- break;
- case TOK_REPL_CONFIG:
- setConfigs((ASTNode) childNode.getChild(0));
- break;
+ case TOK_DBNAME:
+ replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText()));
+ break;
+ case TOK_REPL_CONFIG:
+ setConfigs((ASTNode) childNode.getChild(0));
+ break;
+ case TOK_REPL_TABLES: //Accept TOK_REPL_TABLES for table level repl.Needn't do anything as dump path needs db only
+ break;
default:
throw new SemanticException("Unrecognized token in REPL LOAD statement.");
}
@@ -341,25 +349,18 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
* 36/
*/
private void analyzeReplLoad(ASTNode ast) throws SemanticException {
- initReplLoad(ast);
+ try {
+ initReplLoad(ast);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
// For analyze repl load, we walk through the dir structure available in the path,
// looking at each db, and then each table, and then setting up the appropriate
// import job in its place.
try {
- assert(path != null);
- Path loadPath = new Path(path);
- final FileSystem fs = loadPath.getFileSystem(conf);
-
- // Make fully qualified path for further use.
- loadPath = fs.makeQualified(loadPath);
-
- if (!fs.exists(loadPath)) {
- // supposed dump path does not exist.
- LOG.error("File not found " + loadPath.toUri().toString());
- throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
- }
-
+ assert(sourceDbNameOrPattern != null);
+ Path loadPath = getCurrentLoadPath();
// Ths config is set to make sure that in case of s3 replication, move is skipped.
try {
Warehouse wh = new Warehouse(conf);
@@ -387,27 +388,79 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// At this point, all dump dirs should contain a _dumpmetadata file that
// tells us what is inside that dumpdir.
- DumpMetaData dmd = new DumpMetaData(loadPath, conf);
+ //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+ if (!isTargetAlreadyLoaded) {
+ DumpMetaData dmd = new DumpMetaData(loadPath, conf);
- boolean evDump = false;
- // we will decide what hdfs locations needs to be copied over here as well.
- if (dmd.isIncrementalDump()) {
- LOG.debug("{} contains an incremental dump", loadPath);
- evDump = true;
- } else {
- LOG.debug("{} contains an bootstrap dump", loadPath);
+ boolean evDump = false;
+ // we will decide what hdfs locations needs to be copied over here as well.
+ if (dmd.isIncrementalDump()) {
+ LOG.debug("{} contains an incremental dump", loadPath);
+ evDump = true;
+ } else {
+ LOG.debug("{} contains an bootstrap dump", loadPath);
+ }
+ ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+ dmd.getReplScope(),
+ queryState.getLineageState(), evDump, dmd.getEventTo(),
+ dirLocationsToCopy(loadPath, evDump));
+ rootTasks.add(TaskFactory.get(replLoadWork, conf));
}
- ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
- dmd.getReplScope(),
- queryState.getLineageState(), evDump, dmd.getEventTo(),
- dirLocationsToCopy(loadPath, evDump));
- rootTasks.add(TaskFactory.get(replLoadWork, conf));
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e.getMessage(), e);
}
}
+ private Path getCurrentLoadPath() throws IOException, SemanticException {
+ Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+ Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
+ .getBytes(StandardCharsets.UTF_8.name())));
+ final FileSystem fs = loadPathBase.getFileSystem(conf);
+
+ // Make fully qualified path for further use.
+ loadPathBase = fs.makeQualified(loadPathBase);
+
+ if (!fs.exists(loadPathBase)) {
+ // supposed dump path does not exist.
+ LOG.error("File not found " + loadPathBase.toUri().toString());
+ throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
+ }
+ FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+ if (statuses.length > 0) {
+ //sort based on last modified. Recent one is at the end
+ Arrays.sort(statuses, new Comparator<FileStatus>() {
+ public int compare(FileStatus f1, FileStatus f2) {
+ return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+ }
+ });
+ if (replScope.getDbName() != null) {
+ String currentReplStatusOfTarget
+ = getReplStatus(replScope.getDbName());
+ if (currentReplStatusOfTarget == null) { //bootstrap
+ return statuses[0].getPath();
+ } else {
+ DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf);
+ if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) {
+ isTargetAlreadyLoaded = true;
+ } else {
+ for (FileStatus status : statuses) {
+ DumpMetaData dmd = new DumpMetaData(status.getPath(), conf);
+ if (dmd.isIncrementalDump()
+ && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) {
+ return status.getPath();
+ }
+ }
+ }
+ }
+ } else {
+ //If dbname is null(in case of repl load *), can't get repl status of target, return unsupported
+ throw new UnsupportedOperationException("REPL LOAD * is not supported");
+ }
+ }
+ return null;
+ }
+
private List<DirCopyWork> dirLocationsToCopy(Path loadPath, boolean isIncrementalPhase)
throws HiveException, IOException {
List<DirCopyWork> list = new ArrayList<>();
@@ -466,10 +519,15 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private void analyzeReplStatus(ASTNode ast) throws SemanticException {
initReplStatus(ast);
-
String dbNameOrPattern = replScope.getDbName();
- String replLastId = null;
+ String replLastId = getReplStatus(dbNameOrPattern);
+ prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+ setFetchTask(createFetchTask("last_repl_id#string"));
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {} using configuration {}",
+ replLastId, ctx.getResFile(), conf);
+ }
+ private String getReplStatus(String dbNameOrPattern) throws SemanticException {
try {
// Checking for status of a db
Database database = db.getDatabase(dbNameOrPattern);
@@ -477,18 +535,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
inputs.add(new ReadEntity(database));
Map<String, String> params = database.getParameters();
if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
- replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ return params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
}
}
} catch (HiveException e) {
throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
- // codes
+ // codes
}
-
- prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
- setFetchTask(createFetchTask("last_repl_id#string"));
- LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
- replLastId, ctx.getResFile(), conf);
+ return null;
}
private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
index e606713..e91a7ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
@@ -78,7 +78,7 @@ public class TestParseUtils {
{"LOAD DATA LOCAL INPATH './examples/files/kv.txt' " +
" OVERWRITE INTO TABLE a", TxnType.DEFAULT},
- {"REPL LOAD a from './examples/files/kv.txt'", TxnType.DEFAULT},
+ {"REPL LOAD a INTO a", TxnType.DEFAULT},
{"REPL DUMP a", TxnType.DEFAULT},
{"REPL STATUS a", TxnType.DEFAULT},
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
index 48b9883..81ab01d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -112,40 +112,6 @@ public class TestReplicationSemanticAnalyzer {
assertDatabase(2, root);
assertTableName(root);
}
-
- @Test
- public void parseFromEventId() throws ParseException {
- ASTNode root = parse("repl dump testDb.'test_table' from 100");
- assertDatabase(3, root);
- assertTableName(root);
- assertFromEvent(1, root);
- }
-
- @Test
- public void parseToEventId() throws ParseException {
- ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200");
- assertDatabase(3, root);
- assertTableName(root);
- ASTNode fromClauseRootNode = assertFromEvent(3, root);
- assertToEventId(fromClauseRootNode);
- }
-
- @Test
- public void parseLimit() throws ParseException {
- ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200 limit 10");
- assertDatabase(3, root);
- assertTableName(root);
- ASTNode fromClauseRootNode = assertFromEvent(5, root);
- assertToEventId(fromClauseRootNode);
-
- ASTNode child = (ASTNode) fromClauseRootNode.getChild(3);
- assertEquals("TOK_LIMIT", child.getText());
- assertEquals(0, child.getChildCount());
-
- child = (ASTNode) fromClauseRootNode.getChild(4);
- assertEquals("10", child.getText());
- assertEquals(0, child.getChildCount());
- }
}
public static class ReplDumpWithClause {
@@ -165,66 +131,26 @@ public class TestReplicationSemanticAnalyzer {
assertTableName(root);
assertWithClause(root, 2);
}
-
- @Test
- public void parseFromEventId() throws ParseException {
- ASTNode root = parse("repl dump testDb.'test_table' from 100 "
- + "with ('key.1'='value.1','key.2'='value.2')");
- assertDatabase(4, root);
- assertTableName(root);
- assertFromEvent(1, root);
- assertWithClause(root, 3);
- }
-
- @Test
- public void parseToEventId() throws ParseException {
- ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200 "
- + "with ('key.1'='value.1','key.2'='value.2')");
- assertDatabase(4, root);
- assertTableName(root);
- ASTNode fromClauseRootNode = assertFromEvent(3, root);
- assertToEventId(fromClauseRootNode);
- assertWithClause(root, 3);
- }
-
- @Test
- public void parseLimit() throws ParseException {
- ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200 limit 10 "
- + "with ('key.1'='value.1','key.2'='value.2')");
- assertDatabase(4, root);
- assertTableName(root);
- ASTNode fromClauseRootNode = assertFromEvent(5, root);
- assertToEventId(fromClauseRootNode);
- assertWithClause(root, 3);
-
- ASTNode child = (ASTNode) fromClauseRootNode.getChild(3);
- assertEquals("TOK_LIMIT", child.getText());
- assertEquals(0, child.getChildCount());
-
- child = (ASTNode) fromClauseRootNode.getChild(4);
- assertEquals("10", child.getText());
- assertEquals(0, child.getChildCount());
- }
}
public static class ReplLoad {
@Test
public void parseFromLocation() throws ParseException {
- ASTNode root = parse("repl load from '/some/location/in/hdfs/'");
+ ASTNode root = parse("repl load testDbName");
assertFromLocation(1, root);
}
@Test
public void parseTargetDbName() throws ParseException {
- ASTNode root = parse("repl load targetTestDbName from '/some/location/in/hdfs/'");
+ ASTNode root = parse("repl load testDbName into targetTestDbName");
assertFromLocation(2, root);
assertTargetDatabaseName(root);
}
@Test
public void parseWithClause() throws ParseException {
- ASTNode root = parse("repl load targetTestDbName from '/some/location/in/hdfs/'"
+ ASTNode root = parse("repl load testDbName into targetTestDbName"
+ " with ('mapred.job.queue.name'='repl','hive.repl.approx.max.load.tasks'='100')");
assertFromLocation(3, root);
assertTargetDatabaseName(root);
@@ -251,7 +177,7 @@ public class TestReplicationSemanticAnalyzer {
assertEquals("TOK_REPL_LOAD", root.getText());
assertEquals(expectedNumberOfChildren, root.getChildCount());
ASTNode child = (ASTNode) root.getChild(0);
- assertEquals("'/some/location/in/hdfs/'", child.getText());
+ assertEquals("testDbName", child.getText());
assertEquals(0, child.getChildCount());
}
diff --git a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
index e1a6153..c227b68 100644
--- a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
+++ b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
@@ -26,7 +26,7 @@ show role grant user hive_admin_user;
show tables test_repldump_adminpriv;
repl dump test_repldump_adminpriv;
-dfs -rmr ${system:test.tmp.dir}/hrepl/test_repldump_adminpriv/next;
+dfs -rmr ${system:test.tmp.dir}/hrepl/dGVzdF9yZXBsZHVtcF9hZG1pbnByaXY=/next;
set user.name=ruser1;
show tables test_repldump_adminpriv;
diff --git a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
index 921b50b..0e52e9d 100644
--- a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
+++ b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
@@ -29,10 +29,10 @@ show tables test_replload_adminpriv_src;
repl dump test_replload_adminpriv_src;
-- repl load as admin should succeed
-repl load test_replload_adminpriv_tgt1 from '${system:test.tmp.dir}/hrepl/test_replload_adminpriv_src/next/';
+repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt1;
show tables test_replload_adminpriv_tgt1;
set user.name=ruser1;
-- repl load as non-admin should fail
-repl load test_replload_adminpriv_tgt2 from '${system:test.tmp.dir}/hrepl/test_replload_adminpriv_src/next';
+repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt2;
diff --git a/ql/src/test/queries/clientpositive/repl_load_old_version.q b/ql/src/test/queries/clientpositive/repl_load_old_version.q
deleted file mode 100644
index 11ed75d..0000000
--- a/ql/src/test/queries/clientpositive/repl_load_old_version.q
+++ /dev/null
@@ -1,10 +0,0 @@
-set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-REPL LOAD test_db from '../../data/files/repl_dump' with ('hive.exec.parallel'='false');
-use test_db;
-show tables;
-select * from tbl1 order by fld;
-select * from tbl2 order by fld;
-select * from tbl3 order by fld;
-select * from tbl4 order by fld;
-select * from tbl5 order by fld;
-select * from tbl6 order by fld1;
diff --git a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
index 1499c39..28f9d23 100644
--- a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
+++ b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
@@ -68,8 +68,9 @@ POSTHOOK: query: repl dump test_replload_adminpriv_src
POSTHOOK: type: REPLDUMP
POSTHOOK: Input: database:test_replload_adminpriv_src
#### A masked pattern was here ####
+PREHOOK: query: repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt1
PREHOOK: type: REPLLOAD
-#### A masked pattern was here ####
+POSTHOOK: query: repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt1
POSTHOOK: type: REPLLOAD
PREHOOK: query: show tables test_replload_adminpriv_tgt1
PREHOOK: type: SHOWTABLES