You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/12/20 22:48:15 UTC
hive git commit: HIVE-15466: REPL LOAD & DUMP support for incremental
DROP_TABLE/DROP_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Repository: hive
Updated Branches:
refs/heads/master 5efd20c7f -> ab9b21920
HIVE-15466: REPL LOAD & DUMP support for incremental DROP_TABLE/DROP_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ab9b2192
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ab9b2192
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ab9b2192
Branch: refs/heads/master
Commit: ab9b21920c0b624822850e32197e7d6575fd5fb4
Parents: 5efd20c
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Tue Dec 20 14:47:55 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Tue Dec 20 14:47:55 2016 -0800
----------------------------------------------------------------------
.../hive/ql/TestReplicationScenarios.java | 116 +++++++
.../messaging/json/JSONMessageFactory.java | 6 +-
.../hive/ql/parse/DDLSemanticAnalyzer.java | 2 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 347 +++++++++++++++++--
4 files changed, 435 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 3ac5ba7..d2696be 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -21,11 +21,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.Shell;
+import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -41,6 +45,8 @@ import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
public class TestReplicationScenarios {
@@ -54,6 +60,7 @@ public class TestReplicationScenarios {
static boolean useExternalMS = false;
static int msPort;
static Driver driver;
+ static HiveMetaStoreClient metaStoreClient;
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private ArrayList<String> lastResults;
@@ -91,6 +98,7 @@ public class TestReplicationScenarios {
driver = new Driver(hconf);
SessionState.start(new CliSessionState(hconf));
+ metaStoreClient = new HiveMetaStoreClient(hconf);
}
@AfterClass
@@ -285,6 +293,114 @@ public class TestReplicationScenarios {
verifyResults(ptn_data_1);
run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2");
verifyResults(ptn_data_2);
+
+ }
+
+ @Test
+ public void testDrops() throws IOException {
+
+ String testName = "drops";
+ LOG.info("Testing "+testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')");
+ run("SELECT a from " + dbName + ".ptned WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')");
+ run("SELECT a from " + dbName + ".ptned WHERE b='2'");
+ verifyResults(ptn_data_2);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')");
+ run("SELECT a from " + dbName + ".ptned2 WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')");
+ run("SELECT a from " + dbName + ".ptned2 WHERE b='2'");
+ verifyResults(ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
+
+ run("SELECT * from " + dbName + "_dupe.unptned");
+ verifyResults(unptn_data);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'");
+ verifyResults(ptn_data_2);
+ run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'");
+ verifyResults(ptn_data_2);
+
+ run("DROP TABLE " + dbName + ".unptned");
+ run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+ run("DROP TABLE " + dbName + ".ptned2");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + ".ptned");
+ verifyResults(ptn_data_1);
+
+ advanceDumpDir();;
+ run("REPL DUMP " + dbName + " FROM " + replDumpId);
+ String postDropReplDumpLocn = getResult(0,0);
+ String postDropReplDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+ Exception e = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+ assertNull(tbl);
+ } catch (TException te) {
+ e = te;
+ }
+ assertNotNull(e);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + "_dupe.ptned");
+ verifyResults(ptn_data_1);
+
+ Exception e2 = null;
+ try {
+ Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+ assertNull(tbl);
+ } catch (TException te) {
+ e2 = te;
+ }
+ assertNotNull(e2);
+ assertEquals(NoSuchObjectException.class, e.getClass());
+
}
private String getResult(int rowNum, int colNum) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index c57f577..17e7686 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -221,7 +221,11 @@ public class JSONMessageFactory extends MessageFactory {
}
public static ObjectNode getJsonTree(NotificationEvent event) throws Exception {
- JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
+ return getJsonTree(event.getMessage());
+ }
+
+ public static ObjectNode getJsonTree(String eventMessage) throws Exception {
+ JsonParser jsonParser = (new JsonFactory()).createJsonParser(eventMessage);
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonParser, ObjectNode.class);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 050522f..0f472e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -3132,7 +3132,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
return result;
}
- private static ExprNodeGenericFuncDesc makeBinaryPredicate(
+ public static ExprNodeGenericFuncDesc makeBinaryPredicate(
String fn, ExprNodeDesc left, ExprNodeDesc right) throws SemanticException {
return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
FunctionRegistry.getFunctionInfo(fn).getGenericUDF(), Lists.newArrayList(left, right));
http://git-wip-us.apache.org/repos/asf/hive/blob/ab9b2192/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 8725015..69ccda7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -27,13 +27,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventUtils;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -45,18 +49,28 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.IOUtils;
import javax.annotation.Nullable;
+import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -77,6 +91,122 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
+ public static final String DUMPMETADATA = "_dumpmetadata";
+
+ public enum DUMPTYPE {
+ BOOTSTRAP("BOOTSTRAP"),
+ INCREMENTAL("INCREMENTAL"),
+ EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"),
+ EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"),
+ EVENT_DROP_TABLE("EVENT_DROP_TABLE"),
+ EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
+ EVENT_UNKNOWN("EVENT_UNKNOWN");
+
+ String type = null;
+ DUMPTYPE(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString(){
+ return type;
+ }
+
+ };
+
+ public class DumpMetaData {
+ // wrapper class for reading and writing metadata about a dump
+ // responsible for _dumpmetadata files
+
+ private DUMPTYPE dumpType;
+ private Long eventFrom = null;
+ private Long eventTo = null;
+ private String payload = null;
+ private boolean initialized = false;
+
+ private final Path dumpRoot;
+ private final Path dumpFile;
+
+ public DumpMetaData(Path dumpRoot) {
+ this.dumpRoot = dumpRoot;
+ dumpFile = new Path(dumpRoot, DUMPMETADATA);
+ }
+
+ public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){
+ this(dumpRoot);
+ setDump(lvl,eventFrom,eventTo);
+ }
+
+ public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){
+ this.dumpType = lvl;
+ this.eventFrom = eventFrom;
+ this.eventTo = eventTo;
+ this.initialized = true;
+ }
+
+ public void loadDumpFromFile() throws SemanticException {
+ try {
+ // read from dumpfile and instantiate self
+ FileSystem fs = dumpFile.getFileSystem(conf);
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
+ String line = null;
+ if ( (line = br.readLine()) != null){
+ String[] lineContents = line.split("\t",4);
+ setDump(DUMPTYPE.valueOf(lineContents[0]),Long.valueOf(lineContents[1]),Long.valueOf(lineContents[2]));
+ setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]);
+ } else {
+ throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString());
+ }
+ } catch (IOException ioe){
+ throw new SemanticException(ioe);
+ }
+ }
+
+ public DUMPTYPE getDumpType() throws SemanticException {
+ initializeIfNot();
+ return this.dumpType;
+ }
+
+ public String getPayload() throws SemanticException {
+ initializeIfNot();
+ return this.payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+ public Long getEventFrom() throws SemanticException {
+ initializeIfNot();
+ return eventFrom;
+ }
+
+ public Long getEventTo() throws SemanticException {
+ initializeIfNot();
+ return eventTo;
+ }
+
+ public Path getDumpFilePath() {
+ return dumpFile;
+ }
+
+ public boolean isIncrementalDump() throws SemanticException {
+ initializeIfNot();
+ return (this.dumpType == DUMPTYPE.INCREMENTAL);
+ }
+
+ private void initializeIfNot() throws SemanticException {
+ if (!initialized){
+ loadDumpFromFile();
+ }
+ }
+
+ public void write() throws SemanticException {
+ writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), payload), dumpFile);
+ }
+
+ }
+
public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
@@ -155,13 +285,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
+ String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
Path dumpRoot = new Path(replRoot, getNextDumpDir());
- Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata");
- String lastReplId;
+ DumpMetaData dmd = new DumpMetaData(dumpRoot);
+ Long lastReplId;
try {
if (eventFrom == null){
// bootstrap case
- String bootDumpBeginReplId =
- String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+ Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
for (String dbName : matchesDb(dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
@@ -171,8 +300,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
dumpTbl(ast, dbName, tblName, dbRoot);
}
}
- String bootDumpEndReplId =
- String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+ Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId);
// Now that bootstrap has dumped all objects related, we have to account for the changes
@@ -184,8 +312,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
EventUtils.MSClientNotificationFetcher evFetcher =
new EventUtils.MSClientNotificationFetcher(db.getMSC());
EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
- evFetcher, Long.valueOf(bootDumpBeginReplId),
- Ints.checkedCast(Long.valueOf(bootDumpEndReplId) - Long.valueOf(bootDumpBeginReplId) + 1),
+ evFetcher, bootDumpBeginReplId,
+ Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
evFilter );
// Now we consolidate all the events that happenned during the objdump into the objdump
@@ -194,7 +322,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
// FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
}
- LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId);
+ LOG.info("Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+ dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId);
+ dmd.write();
+
// Set the correct last repl id to return to the user
lastReplId = bootDumpEndReplId;
} else {
@@ -230,13 +361,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
dumpEvent(ev,evRoot);
}
- LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo);
- List<String> vals;
- writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata);
+ LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo);
+ writeOutput(Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath());
+ dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo);
+ dmd.write();
// Set the correct last repl id to return to the user
- lastReplId = String.valueOf(eventTo);
+ lastReplId = eventTo;
}
- prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), lastReplId), dumpSchema);
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
setFetchTask(createFetchTask(dumpSchema));
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
@@ -248,7 +380,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception {
long evid = ev.getEventId();
String evidStr = String.valueOf(evid);
- ReplicationSpec replicationSpec = getNewReplicationSpec(evidStr, evidStr);
+ ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr);
MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
switch (ev.getEventType()){
case MessageFactory.CREATE_TABLE_EVENT : {
@@ -278,7 +410,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// we will, however, do so here, now, for dev/debug's sake.
Path dataPath = new Path(evRoot,"data");
rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf));
-
+ (new DumpMetaData(evRoot,DUMPTYPE.EVENT_CREATE_TABLE,evid,evid)).write();
break;
}
case MessageFactory.ADD_PARTITION_EVENT : {
@@ -334,11 +466,29 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf));
}
+ (new DumpMetaData(evRoot,DUMPTYPE.EVENT_ADD_PARTITION,evid,evid)).write();
+ break;
+ }
+ case MessageFactory.DROP_TABLE_EVENT : {
+ LOG.info("Processing#{} DROP_TABLE message : {}",ev.getEventId(),ev.getMessage());
+ DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_TABLE,evid,evid);
+ dmd.setPayload(ev.getMessage());
+ dmd.write();
+ break;
+ }
+ case MessageFactory.DROP_PARTITION_EVENT : {
+ LOG.info("Processing#{} DROP_PARTITION message : {}",ev.getEventId(),ev.getMessage());
+ DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_PARTITION,evid,evid);
+ dmd.setPayload(ev.getMessage());
+ dmd.write();
break;
}
+ // TODO : handle other event types
default:
- LOG.info("Skipping processing#{} message : {}",ev.getEventId(), ev.getMessage());
- // TODO : handle other event types
+ LOG.info("Dummy processing#{} message : {}",ev.getEventId(), ev.getMessage());
+ DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_UNKNOWN,evid,evid);
+ dmd.setPayload(ev.getMessage());
+ dmd.write();
break;
}
@@ -426,14 +576,44 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
/*
* Example dump dirs we need to be able to handle :
*
- * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/<dumpdir>
+ * for: hive.repl.rootdir = staging/
+ * Then, repl dumps will be created in staging/<dumpdir>
*
- * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/
- * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files
+ * single-db-dump: staging/blah12345 will contain a db dir for the db specified
+ * blah12345/
+ * default/
+ * _metadata
+ * tbl1/
+ * _metadata
+ * dt=20160907/
+ * _files
+ * tbl2/
+ * tbl3/
+ * unptn_tbl/
+ * _metadata
+ * _files
*
- * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ...
+ * multi-db-dump: staging/bar12347 will contain dirs for each db covered
+ * staging/
+ * bar12347/
+ * default/
+ * ...
+ * sales/
+ * ...
*
- * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files
+ * single table-dump: staging/baz123 will contain a table object dump inside
+ * staging/
+ * baz123/
+ * _metadata
+ * dt=20150931/
+ * _files
+ *
+ * incremental dump : staging/blue123 will contain dirs for each event inside.
+ * staging/
+ * blue123/
+ * 34/
+ * 35/
+ * 36/
*/
private void analyzeReplLoad(ASTNode ast) throws SemanticException {
LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "."
@@ -458,22 +638,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// db name, and with a _metadata file in each, and table dirs inside that.
// b) It can be a table dump dir, in which case we expect a _metadata dump of
// a table in question in the dir, and individual ptn dir hierarchy.
- // c) A dump can be an event-level dump, which means we have several subdirs
+ // c) A dump can be an incremental dump, which means we have several subdirs
// each of which have the evid as the dir name, and each of which correspond
// to a event-level dump. Currently, only CREATE_TABLE and ADD_PARTITION are
// handled, so all of these dumps will be at a table/ptn level.
- // For incremental repl, eventually, we can have individual events which can
+ // For incremental repl, we will have individual events which can
// be other things like roles and fns as well.
+ // At this point, all dump dirs should contain a _dumpmetadata file that
+ // tells us what is inside that dumpdir.
+
+ DumpMetaData dmd = new DumpMetaData(loadPath);
boolean evDump = false;
- Path dumpMetadata = new Path(loadPath, "_dumpmetadata");
- // TODO : only event dumps currently have _dumpmetadata - this might change. Generify.
- if (fs.exists(dumpMetadata)){
- LOG.debug("{} exists, this is a event dump", dumpMetadata);
+ if (dmd.isIncrementalDump()){
+ LOG.debug("{} contains an incremental dump", loadPath);
evDump = true;
} else {
- LOG.debug("{} does not exist, this is an object dump", dumpMetadata);
+ LOG.debug("{} contains an bootstrap dump", loadPath);
}
if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
@@ -533,6 +715,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// \ /
// --->ev1.task3--
//
+ // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the
+ // entire chain
+
List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail);
LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0);
@@ -562,10 +747,98 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private List<Task<? extends Serializable>> analyzeEventLoad(
String dbName, String tblName, String locn,
- Task<? extends Serializable> precursor ) throws SemanticException {
+ Task<? extends Serializable> precursor) throws SemanticException {
// Currently handles only create-tbl & insert-ptn, since only those are dumped
// As we add more event types, this will expand.
- return analyzeTableLoad(dbName, tblName, locn, precursor);
+ DumpMetaData dmd = new DumpMetaData(new Path(locn));
+ switch (dmd.getDumpType()) {
+ case EVENT_CREATE_TABLE: {
+ return analyzeTableLoad(dbName, tblName, locn, precursor);
+ }
+ case EVENT_ADD_PARTITION: {
+ return analyzeTableLoad(dbName, tblName, locn, precursor);
+ }
+ case EVENT_DROP_TABLE: {
+ MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
+ DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload());
+ DropTableDesc dropTableDesc = new DropTableDesc(
+ dbName + "." + (tblName == null ? dropTableMessage.getTable() : tblName),
+ null, true, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+ Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf);
+ if (precursor != null){
+ precursor.addDependentTask(dropTableTask);
+ }
+ List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
+ tasks.add(dropTableTask);
+ LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName());
+ return tasks;
+ }
+ case EVENT_DROP_PARTITION: {
+ MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
+ DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload());
+ Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = genPartSpecs(dropPartitionMessage.getPartitions());
+ if (partSpecs.size() > 0){
+ DropTableDesc dropPtnDesc = new DropTableDesc(
+ dbName + "." + (tblName == null ? dropPartitionMessage.getTable() : tblName), partSpecs,
+ null, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+ Task<DDLWork> dropPtnTask = TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf);
+ if (precursor != null){
+ precursor.addDependentTask(dropPtnTask);
+ }
+ List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
+ tasks.add(dropPtnTask);
+ LOG.debug("Added drop ptn task : {}:{},{}",
+ dropPtnTask.getId(), dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions());
+ return tasks;
+ } else {
+ throw new SemanticException("DROP PARTITION EVENT does not return any part descs for event message :"+dmd.getPayload());
+ }
+ }
+ case EVENT_UNKNOWN: {
+ break;
+ }
+ default: {
+ break;
+ }
+ }
+ return null;
+ }
+
+ private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(List<Map<String, String>> partitions) throws SemanticException {
+ Map<Integer,List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<Integer,List<ExprNodeGenericFuncDesc>>();
+
+ int partPrefixLength = 0;
+ if ((partitions != null) && (partitions.size() > 0)) {
+ partPrefixLength = partitions.get(0).size();
+ // pick the length of the first ptn, we expect all ptns listed to have the same number of key-vals.
+ }
+ List<ExprNodeGenericFuncDesc> ptnDescs = new ArrayList<ExprNodeGenericFuncDesc>();
+ for (Map<String,String> ptn : partitions) {
+ // convert each key-value-map to appropriate expression.
+
+ ExprNodeGenericFuncDesc expr = null;
+ for (Map.Entry<String,String> kvp : ptn.entrySet()) {
+ String key = kvp.getKey();
+ Object val = kvp.getValue();
+ // FIXME : bug here, value is being placed as a String, but should actually be the underlying type
+ // as converted to it by looking at the table's col schema. To do that, however, we need the
+ // tableObjJson from the DropTableMessage. So, currently, this will only work for partitions for
+ // which the partition keys are all strings. So, for now, we hardcode it, but we need to fix this.
+ String type = "string";
+
+ PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+ ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
+ ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate("=", column, new ExprNodeConstantDesc(pti, val));
+ expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+ }
+ if (expr != null) {
+ ptnDescs.add(expr);
+ }
+ }
+ if (ptnDescs.size() > 0){
+ partSpecs.put(partPrefixLength,ptnDescs);
+ }
+ return partSpecs;
}
private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
@@ -709,8 +982,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
- LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}" ,
- String.valueOf(replLastId),ctx.getResFile());
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
+ String.valueOf(replLastId), ctx.getResFile());
}
private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
@@ -752,10 +1025,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ // Use for specifying object state as well as event state
private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
return new ReplicationSpec(true, false, evState, objState, false, true);
}
+ // Use for replication states focussed on event only, where the obj state will be the event state
+ private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) throws SemanticException {
+ return getNewReplicationSpec(evState,evState);
+ }
+
private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
throws HiveException {
if (tblPattern == null) {