You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/01/26 20:17:27 UTC
hive git commit: HIVE-15587: Using ChangeManager to copy files in
ReplCopyTask (Daniel Dai, reviewed by Vaibhav Gumashta)
Repository: hive
Updated Branches:
refs/heads/master 85c103532 -> 318db5a35
HIVE-15587: Using ChangeManager to copy files in ReplCopyTask (Daniel Dai, reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/318db5a3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/318db5a3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/318db5a3
Branch: refs/heads/master
Commit: 318db5a3579c7f4039ee24636cd1eeb3b6633bbb
Parents: 85c1035
Author: Daniel Dai <da...@hortonworks.com>
Authored: Thu Jan 26 12:16:47 2017 -0800
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Thu Jan 26 12:16:47 2017 -0800
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 16 +-
.../hive/metastore/TestReplChangeManager.java | 26 +--
.../hive/ql/TestReplicationScenarios.java | 214 ++++++++++++++++++-
.../hive/metastore/ReplChangeManager.java | 124 +++++++++--
.../hadoop/hive/ql/exec/ReplCopyTask.java | 39 ++--
.../apache/hadoop/hive/ql/parse/EximUtil.java | 17 --
.../ql/parse/ReplicationSemanticAnalyzer.java | 57 +++--
7 files changed, 391 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 7524c49..4df2758 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
@@ -59,7 +59,6 @@ import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
-import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -223,7 +222,8 @@ public class DbNotificationListener extends MetaStoreEventListener {
try {
FileStatus file = files[i];
i++;
- return buildFileWithChksum(file.getPath(), fs);
+ return ReplChangeManager.encodeFileUri(file.getPath().toString(),
+ ReplChangeManager.getChksumString(file.getPath(), fs));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -520,16 +520,6 @@ public class DbNotificationListener extends MetaStoreEventListener {
}
- String buildFileWithChksum(Path p, FileSystem fs) throws IOException {
- FileChecksum cksum = fs.getFileChecksum(p);
- String chksumString = null;
- if (cksum != null) {
- chksumString =
- StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength());
- }
- return encodeFileUri(p.toString(), chksumString);
- }
-
// TODO: this needs to be enhanced once change management based filesystem is implemented
// Currently using fileuri#checksum as the format
private String encodeFileUri(String fileUriStr, String fileChecksum) {
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index 205c640..1ac4d01 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -53,6 +53,7 @@ public class TestReplChangeManager {
private static Warehouse warehouse;
private static MiniDFSCluster m_dfs;
private static String cmroot;
+ private static FileSystem fs;
@BeforeClass
public static void setUp() throws Exception {
@@ -65,6 +66,7 @@ public class TestReplChangeManager {
hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot);
hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
warehouse = new Warehouse(hiveConf);
+ fs = new Path(cmroot).getFileSystem(hiveConf);
try {
client = new HiveMetaStoreClient(hiveConf);
} catch (Throwable e) {
@@ -151,15 +153,15 @@ public class TestReplChangeManager {
Path part1Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part");
createFile(part1Path, "p1");
- String path1Chksum = ReplChangeManager.getCksumString(part1Path, hiveConf);
+ String path1Chksum = ReplChangeManager.getChksumString(part1Path, fs);
Path part2Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part");
createFile(part2Path, "p2");
- String path2Chksum = ReplChangeManager.getCksumString(part2Path, hiveConf);
+ String path2Chksum = ReplChangeManager.getChksumString(part2Path, fs);
Path part3Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part");
createFile(part3Path, "p3");
- String path3Chksum = ReplChangeManager.getCksumString(part3Path, hiveConf);
+ String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs);
Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
@@ -221,15 +223,15 @@ public class TestReplChangeManager {
Path filePath1 = new Path(warehouse.getTablePath(db, tblName), "part1");
createFile(filePath1, "f1");
- String fileChksum1 = ReplChangeManager.getCksumString(filePath1, hiveConf);
+ String fileChksum1 = ReplChangeManager.getChksumString(filePath1, fs);
Path filePath2 = new Path(warehouse.getTablePath(db, tblName), "part2");
createFile(filePath2, "f2");
- String fileChksum2 = ReplChangeManager.getCksumString(filePath2, hiveConf);
+ String fileChksum2 = ReplChangeManager.getChksumString(filePath2, fs);
Path filePath3 = new Path(warehouse.getTablePath(db, tblName), "part3");
createFile(filePath3, "f3");
- String fileChksum3 = ReplChangeManager.getCksumString(filePath3, hiveConf);
+ String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs);
Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
@@ -267,26 +269,26 @@ public class TestReplChangeManager {
fs.mkdirs(dirTbl1);
Path part11 = new Path(dirTbl1, "part1");
createFile(part11, "testClearer11");
- String fileChksum11 = ReplChangeManager.getCksumString(part11, hiveConf);
+ String fileChksum11 = ReplChangeManager.getChksumString(part11, fs);
Path part12 = new Path(dirTbl1, "part2");
createFile(part12, "testClearer12");
- String fileChksum12 = ReplChangeManager.getCksumString(part12, hiveConf);
+ String fileChksum12 = ReplChangeManager.getChksumString(part12, fs);
Path dirTbl2 = new Path(dirDb, "tbl2");
fs.mkdirs(dirTbl2);
Path part21 = new Path(dirTbl2, "part1");
createFile(part21, "testClearer21");
- String fileChksum21 = ReplChangeManager.getCksumString(part21, hiveConf);
+ String fileChksum21 = ReplChangeManager.getChksumString(part21, fs);
Path part22 = new Path(dirTbl2, "part2");
createFile(part22, "testClearer22");
- String fileChksum22 = ReplChangeManager.getCksumString(part22, hiveConf);
+ String fileChksum22 = ReplChangeManager.getChksumString(part22, fs);
Path dirTbl3 = new Path(dirDb, "tbl3");
fs.mkdirs(dirTbl3);
Path part31 = new Path(dirTbl3, "part1");
createFile(part31, "testClearer31");
- String fileChksum31 = ReplChangeManager.getCksumString(part31, hiveConf);
+ String fileChksum31 = ReplChangeManager.getChksumString(part31, fs);
Path part32 = new Path(dirTbl3, "part2");
createFile(part32, "testClearer32");
- String fileChksum32 = ReplChangeManager.getCksumString(part32, hiveConf);
+ String fileChksum32 = ReplChangeManager.getChksumString(part32, fs);
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false);
ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false);
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 5be3e9c..7836c47 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -90,9 +90,11 @@ public class TestReplicationScenarios {
WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf);
}
- System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
- msPort = MetaStoreUtils.startMetaStore();
+ hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+ hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
+ msPort = MetaStoreUtils.startMetaStore(hconf);
hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+ msPort);
@@ -193,6 +195,87 @@ public class TestReplicationScenarios {
}
@Test
+ public void testBasicWithCM() throws Exception {
+
+ String testName = "basic_with_cm";
+ LOG.info("Testing "+testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+ String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+ createTestDataFile(ptn_locn_2_later, ptn_data_2_later);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(ptn_data_2);
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+ run("SELECT * from " + dbName + ".unptned_empty");
+ verifyResults(empty);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+
+ // Table dropped after "repl dump"
+ run("DROP TABLE " + dbName + ".unptned");
+ // Partition droppped after "repl dump"
+ run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)");
+ // File changed after "repl dump"
+ Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2");
+ Path loc = new Path(p.getSd().getLocation());
+ FileSystem fs = loc.getFileSystem(hconf);
+ Path file = fs.listStatus(loc)[0].getPath();
+ fs.delete(file, false);
+ fs.copyFromLocalFile(new Path(ptn_locn_2_later), file);
+
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
+
+ run("SELECT * from " + dbName + "_dupe.unptned");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ // Since partition(b=2) changed manually, Hive cannot find
+ // it in original location and cmroot, thus empty
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+ run("SELECT * from " + dbName + ".unptned_empty");
+ verifyResults(empty);
+ }
+
+ @Test
public void testIncrementalAdds() throws IOException {
String testName = "incrementalAdds";
LOG.info("Testing "+testName);
@@ -319,7 +402,6 @@ public class TestReplicationScenarios {
run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)");
verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2);
-
// At this point, we've set up all the tables and ptns we're going to test drops across
// Replicate it first, and then we'll drop it on the source.
@@ -393,6 +475,132 @@ public class TestReplicationScenarios {
}
@Test
+ public void testDropsWithCM() throws IOException {
+
+ String testName = "drops_with_cm";
+ LOG.info("Testing "+testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+ run("SELECT a from " + dbName + ".ptned WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+ run("SELECT a from " + dbName + ".ptned WHERE b='2'");
+ verifyResults(ptn_data_2);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+ run("SELECT a from " + dbName + ".ptned2 WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+ run("SELECT a from " + dbName + ".ptned2 WHERE b='2'");
+ verifyResults(ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
+
+ run("SELECT * from " + dbName + "_dupe.unptned");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'");
+ verifyResults(ptn_data_2);
+ run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'");
+ verifyResults(ptn_data_2);
+
+ run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned");
+ run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned");
+ run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM " +
+ dbName + ".ptned WHERE b='1'");
+ run("SELECT a from " + dbName + ".unptned_copy");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + ".ptned_copy");
+ verifyResults(ptn_data_1);
+
+ run("DROP TABLE " + dbName + ".unptned");
+ run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+ run("DROP TABLE " + dbName + ".ptned2");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + ".ptned");
+ verifyResults(ptn_data_1);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String postDropReplDumpLocn = getResult(0,0);
+ String postDropReplDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+
+ // Drop table after dump
+ run("DROP TABLE " + dbName + ".unptned_copy");
+ // Drop partition after dump
+ run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')");
+
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+ Exception e = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+ assertNull(tbl);
+ } catch (TException te) {
+ e = te;
+ }
+ assertNotNull(e);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + "_dupe.ptned");
+ verifyResults(ptn_data_1);
+
+ Exception e2 = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+ assertNull(tbl);
+ } catch (TException te) {
+ e2 = te;
+ }
+ assertNotNull(e2);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
+ run("SELECT a from " + dbName + "_dupe.unptned_copy");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned_copy");
+ verifyResults(ptn_data_1);
+ }
+
+ @Test
public void testAlters() throws IOException {
String testName = "alters";
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 99cba9d..51e4627 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -55,6 +55,7 @@ public class ReplChangeManager {
public static final String ORIG_LOC_TAG = "user.original-loc";
public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
+ public static final String URI_FRAGMENT_SEPARATOR = "#";
public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException {
if (instance == null) {
@@ -121,7 +122,7 @@ public class ReplChangeManager {
count += recycle(file.getPath(), ifPurge);
}
} else {
- Path cmPath = getCMPath(path, hiveConf, getCksumString(path, hiveConf));
+ Path cmPath = getCMPath(path, hiveConf, getChksumString(path, fs));
if (LOG.isDebugEnabled()) {
LOG.debug("Moving " + path.toString() + " to " + cmPath.toString());
@@ -151,7 +152,11 @@ public class ReplChangeManager {
// Note we currently only track the last known trace as
// xattr has limited capacity. We shall revisit and store all original
// locations if orig-loc becomes important
- fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+ try {
+ fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error setting xattr for " + path.toString());
+ }
count++;
}
@@ -159,7 +164,11 @@ public class ReplChangeManager {
// If multiple files share the same content, then
// any file claim remain in trash would be granted
if (!ifPurge) {
- fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0});
+ try {
+ fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0});
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error setting xattr for " + cmPath.toString());
+ }
}
}
return count;
@@ -169,16 +178,22 @@ public class ReplChangeManager {
}
// Get checksum of a file
- static public String getCksumString(Path path, Configuration conf) throws IOException {
+ static public String getChksumString(Path path, FileSystem fs) throws IOException {
// TODO: fs checksum only available on hdfs, need to
// find a solution for other fs (eg, local fs, s3, etc)
- FileSystem fs = path.getFileSystem(conf);
+ String checksumString = null;
FileChecksum checksum = fs.getFileChecksum(path);
- String checksumString = StringUtils.byteToHexString(
- checksum.getBytes(), 0, checksum.getLength());
+ if (checksum != null) {
+ checksumString = StringUtils.byteToHexString(
+ checksum.getBytes(), 0, checksum.getLength());
+ }
return checksumString;
}
+ static public void setCmRoot(Path cmRoot) {
+ ReplChangeManager.cmroot = cmRoot;
+ }
+
/***
* Convert a path of file inside a partition or table (if non-partitioned)
* to a deterministic location of cmroot. So user can retrieve the file back
@@ -205,6 +220,69 @@ public class ReplChangeManager {
return cmPath;
}
+ /***
+ * Get original file specified by src and chksumString. If the file exists and checksum
+ * matches, return the file; otherwise, use chksumString to retrieve it from cmroot
+ * @param src Original file location
+ * @param chksumString Checksum of the original file
+ * @param conf
+ * @return Corresponding FileStatus object
+ * @throws MetaException
+ */
+ static public FileStatus getFileStatus(Path src, String chksumString,
+ HiveConf conf) throws MetaException {
+ try {
+ FileSystem srcFs = src.getFileSystem(conf);
+ if (chksumString == null) {
+ return srcFs.getFileStatus(src);
+ }
+
+ if (!srcFs.exists(src)) {
+ return srcFs.getFileStatus(getCMPath(src, conf, chksumString));
+ }
+
+ String currentChksumString = getChksumString(src, srcFs);
+ if (currentChksumString == null || chksumString.equals(currentChksumString)) {
+ return srcFs.getFileStatus(src);
+ } else {
+ return srcFs.getFileStatus(getCMPath(src, conf, chksumString));
+ }
+ } catch (IOException e) {
+ throw new MetaException(StringUtils.stringifyException(e));
+ }
+ }
+
+ /***
+ * Concatenate filename and checksum with "#"
+ * @param fileUriStr Filename string
+ * @param fileChecksum Checksum string
+ * @return Concatenated Uri string
+ */
+ // TODO: this needs to be enhanced once change management based filesystem is implemented
+ // Currently using fileuri#checksum as the format
+ static public String encodeFileUri(String fileUriStr, String fileChecksum) {
+ if (fileChecksum != null) {
+ return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
+ } else {
+ return fileUriStr;
+ }
+ }
+
+ /***
+ * Split uri with fragment into file uri and checksum
+ * @param fileURIStr uri with fragment
+ * @return array of file name and checksum
+ */
+ static public String[] getFileWithChksumFromURI(String fileURIStr) {
+ String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR);
+ String[] result = new String[2];
+ result[0] = uriAndFragment[0];
+ if (uriAndFragment.length>1) {
+ result[1] = uriAndFragment[1];
+ }
+ return result;
+ }
+
/**
* Thread to clear old files of cmroot recursively
*/
@@ -231,24 +309,28 @@ public class ReplChangeManager {
for (FileStatus file : files) {
long modifiedTime = file.getModificationTime();
if (now - modifiedTime > secRetain*1000) {
- if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
- boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf);
- if (succ) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Move " + file.toString() + " to trash");
+ try {
+ if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
+ boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf);
+ if (succ) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Move " + file.toString() + " to trash");
+ }
+ } else {
+ LOG.warn("Fail to move " + file.toString() + " to trash");
}
} else {
- LOG.warn("Fail to move " + file.toString() + " to trash");
- }
- } else {
- boolean succ = fs.delete(file.getPath(), false);
- if (succ) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Remove " + file.toString());
+ boolean succ = fs.delete(file.getPath(), false);
+ if (succ) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove " + file.toString());
+ }
+ } else {
+ LOG.warn("Fail to remove " + file.toString());
}
- } else {
- LOG.warn("Fail to remove " + file.toString());
}
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error getting xattr for " + file.getPath().toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index e6b943b..4686e2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.CopyWork;
@@ -126,15 +128,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
for (FileStatus oneSrc : srcFiles) {
console.printInfo("Copying file: " + oneSrc.getPath().toString());
LOG.debug("Copying file: " + oneSrc.getPath().toString());
+
+ FileSystem actualSrcFs = null;
+ if (rwork.getReadListFromInput()){
+ // TODO : filesystemcache prevents this from being a perf nightmare, but we
+ // should still probably follow up to see if we need to do something better here.
+ actualSrcFs = oneSrc.getPath().getFileSystem(conf);
+ } else {
+ actualSrcFs = srcFs;
+ }
if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
- FileSystem actualSrcFs = null;
- if (rwork.getReadListFromInput()){
- // TODO : filesystemcache prevents this from being a perf nightmare, but we
- // should still probably follow up to see if we need to do something better here.
- actualSrcFs = oneSrc.getPath().getFileSystem(conf);
- } else {
- actualSrcFs = srcFs;
- }
LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
@@ -148,7 +151,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
}else{
LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
- listBW.write(oneSrc.getPath().toUri().toString() + "\n");
+ String chksumString = ReplChangeManager.getChksumString(oneSrc.getPath(), actualSrcFs);
+ listBW.write(ReplChangeManager.encodeFileUri
+ (oneSrc.getPath().toUri().toString(), chksumString) + "\n");
}
}
@@ -183,12 +188,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
String line = null;
while ( (line = br.readLine()) != null){
LOG.debug("ReplCopyTask :_filesReadLine:" + line);
- String fileUriStr = EximUtil.getCMDecodedFileName(line);
- // TODO HIVE-15490: Add checksum validation here
- Path p = new Path(fileUriStr);
- // TODO: again, fs cache should make this okay, but if not, revisit
- FileSystem srcFs = p.getFileSystem(conf);
- ret.add(srcFs.getFileStatus(p));
+
+ String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line);
+ try {
+ FileStatus f = ReplChangeManager.getFileStatus(new Path(fileWithChksum[0]),
+ fileWithChksum[1], conf);
+ ret.add(f);
+ } catch (MetaException e) {
+ // skip and issue warning for missing file
+ LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot");
+ }
// Note - we need srcFs rather than fs, because it is possible that the _files lists files
// which are from a different filesystem than the fs where the _files file itself was loaded
// from. Currently, it is possible, for eg., to do REPL LOAD hdfs://<ip>/dir/ and for the _files
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 34e53d2..796ccc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -78,7 +77,6 @@ public class EximUtil {
public static final String METADATA_NAME = "_metadata";
public static final String FILES_NAME = "_files";
public static final String DATA_PATH_NAME = "data";
- public static final String URI_FRAGMENT_SEPARATOR = "#";
private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
@@ -574,19 +572,4 @@ public class EximUtil {
};
}
- public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) {
- // The checksum is set as the fragment portion of the file uri
- return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
- }
-
- public static String getCMDecodedFileName(String encodedFileURIStr) {
- String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR);
- return uriAndFragment[0];
- }
-
- public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) {
- // TODO: Implement this as part of HIVE-15490
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 86b6a6e..2b327db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -141,22 +142,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private final Path dumpRoot;
private final Path dumpFile;
+ private Path cmRoot;
public DumpMetaData(Path dumpRoot) {
this.dumpRoot = dumpRoot;
dumpFile = new Path(dumpRoot, DUMPMETADATA);
}
- public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){
+ public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){
this(dumpRoot);
- setDump(lvl, eventFrom, eventTo);
+ setDump(lvl, eventFrom, eventTo, cmRoot);
}
- public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){
+ public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){
this.dumpType = lvl;
this.eventFrom = eventFrom;
this.eventTo = eventTo;
this.initialized = true;
+ this.cmRoot = cmRoot;
}
public void loadDumpFromFile() throws SemanticException {
@@ -166,9 +169,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
String line = null;
if ( (line = br.readLine()) != null){
- String[] lineContents = line.split("\t", 4);
- setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]));
- setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]);
+ String[] lineContents = line.split("\t", 5);
+ setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]),
+ new Path(lineContents[3]));
+ setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]);
+ ReplChangeManager.setCmRoot(cmRoot);
} else {
throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString());
}
@@ -201,6 +206,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
return eventTo;
}
+ public Path getCmRoot() {
+ return cmRoot;
+ }
+
+ public void setCmRoot(Path cmRoot) {
+ this.cmRoot = cmRoot;
+ }
+
public Path getDumpFilePath() {
return dumpFile;
}
@@ -217,7 +230,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
public void write() throws SemanticException {
- writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), payload), dumpFile);
+ writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(),
+ cmRoot.toString(), payload), dumpFile);
}
}
@@ -301,6 +315,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
Path dumpRoot = new Path(replRoot, getNextDumpDir());
DumpMetaData dmd = new DumpMetaData(dumpRoot);
+ Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
Long lastReplId;
try {
if (eventFrom == null){
@@ -340,7 +355,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.info(
"Consolidation done, preparing to return {},{}->{}",
dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
- dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId);
+ dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
dmd.write();
// Set the correct last repl id to return to the user
@@ -375,14 +390,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
while (evIter.hasNext()){
NotificationEvent ev = evIter.next();
Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
- dumpEvent(ev, evRoot);
+ dumpEvent(ev, evRoot, cmRoot);
}
LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo);
writeOutput(
Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)),
dmd.getDumpFilePath());
- dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo);
+ dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo, cmRoot);
dmd.write();
// Set the correct last repl id to return to the user
lastReplId = eventTo;
@@ -396,7 +411,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception {
+ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception {
long evid = ev.getEventId();
String evidStr = String.valueOf(evid);
ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr);
@@ -439,7 +454,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid)).write();
+ (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid, cmRoot)).write();
break;
}
case MessageFactory.ADD_PARTITION_EVENT : {
@@ -504,19 +519,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid)).write();
+ (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid, cmRoot)).write();
break;
}
case MessageFactory.DROP_TABLE_EVENT : {
LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), ev.getMessage());
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
break;
}
case MessageFactory.DROP_PARTITION_EVENT : {
LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), ev.getMessage());
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
break;
@@ -540,12 +555,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
null,
replicationSpec);
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
} else {
// rename scenario
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
}
@@ -582,13 +597,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
qlMdTable,
qlPtns,
replicationSpec);
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
break;
} else {
// rename scenario
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
break;
@@ -626,7 +641,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage());
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
break;
@@ -634,7 +649,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// TODO : handle other event types
default:
LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage());
- DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid);
+ DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid, cmRoot);
dmd.setPayload(ev.getMessage());
dmd.write();
break;