You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2016/12/19 23:08:39 UTC
hive git commit: HIVE-15426 : Fix order guarantee of event executions
for REPL LOAD (Sushanth Sowmyan, reviewed by Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master bbd99ed60 -> 268c60ebb
HIVE-15426 : Fix order guarantee of event executions for REPL LOAD (Sushanth Sowmyan, reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/268c60eb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/268c60eb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/268c60eb
Branch: refs/heads/master
Commit: 268c60ebb22691f5410223af2aafcd06ea6a2273
Parents: bbd99ed
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Mon Dec 19 15:07:53 2016 -0800
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Mon Dec 19 15:08:35 2016 -0800
----------------------------------------------------------------------
.../hive/ql/TestReplicationScenarios.java | 51 +++++++++---
.../hive/ql/parse/ImportSemanticAnalyzer.java | 20 ++---
.../ql/parse/ReplicationSemanticAnalyzer.java | 85 ++++++++++++++------
3 files changed, 111 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/268c60eb/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 9b7014b..3ac5ba7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -163,7 +163,13 @@ public class TestReplicationScenarios {
advanceDumpDir();
run("REPL DUMP " + dbName);
String replDumpLocn = getResult(0,0);
- run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'");
+ String replDumpId = getResult(0,1,true);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+ printOutput();
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ run("REPL STATUS " + dbName + "_dupe");
+ verifyResults(new String[] {replDumpId});
run("SELECT * from " + dbName + "_dupe.unptned");
verifyResults(unptn_data);
@@ -230,7 +236,6 @@ public class TestReplicationScenarios {
run("SELECT a from " + dbName + ".ptned WHERE b=2");
verifyResults(ptn_data_2);
- // verified up to here.
run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)");
run("SELECT a from " + dbName + ".ptned_late WHERE b=1");
@@ -244,19 +249,28 @@ public class TestReplicationScenarios {
String incrementalDumpLocn = getResult(0,0);
String incrementalDumpId = getResult(0,1,true);
LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+ printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+ run("REPL STATUS " + dbName + "_dupe");
+// verifyResults(new String[] {incrementalDumpId});
+ // TODO: this will currently not work because we need to add in ALTER_DB support into this
+ // and queue in a dummy ALTER_DB to update the repl.last.id on the last event of every
+ // incremental dump. Currently, the dump id fetched will be the last dump id at the time
+ // the db was created from the bootstrap export dump
+
run("SELECT * from " + dbName + "_dupe.unptned_empty");
verifyResults(empty);
run("SELECT a from " + dbName + ".ptned_empty");
verifyResults(empty);
-// this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
-// to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
-// fixing that.
// run("SELECT * from " + dbName + "_dupe.unptned");
// verifyResults(unptn_data);
+ // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
+ // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
+ // fixing that.
run("SELECT * from " + dbName + "_dupe.unptned_late");
verifyResults(unptn_data);
@@ -291,6 +305,16 @@ public class TestReplicationScenarios {
}
private void verifyResults(String[] data) throws IOException {
+ List<String> results = getOutput();
+ LOG.info("Expecting {}",data);
+ LOG.info("Got {}",results);
+ assertEquals(data.length,results.size());
+ for (int i = 0; i < data.length; i++){
+ assertEquals(data[i],results.get(i));
+ }
+ }
+
+ private List<String> getOutput() throws IOException {
List<String> results = new ArrayList<String>();
try {
driver.getResults(results);
@@ -298,16 +322,23 @@ public class TestReplicationScenarios {
LOG.warn(e.getMessage(),e);
throw new RuntimeException(e);
}
- LOG.info("Expecting {}",data);
- LOG.info("Got {}",results);
- assertEquals(data.length,results.size());
- for (int i = 0; i < data.length; i++){
- assertEquals(data[i],results.get(i));
+ return results;
+ }
+
+ private void printOutput() throws IOException {
+ for (String s : getOutput()){
+ LOG.info(s);
}
}
private static void run(String cmd) throws RuntimeException {
+ try {
run(cmd,false); // default arg-less run simply runs, and does not care about failure
+ } catch (AssertionError ae){
+ // Hive code has AssertionErrors in some cases - we want to record what happens
+ LOG.warn("AssertionError:",ae);
+ throw new RuntimeException(ae);
+ }
}
private static boolean run(String cmd, boolean errorOnFail) throws RuntimeException {
http://git-wip-us.apache.org/repos/asf/hive/blob/268c60eb/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index ce952c5..5561e06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -102,9 +102,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
String parsedDbName = null;
LinkedHashMap<String, String> parsedPartSpec = new LinkedHashMap<String, String>();
- // waitOnCreateDb determines whether or not non-existence of
- // db is an error. For regular imports, it is.
- boolean waitOnCreateDb = false;
+ // waitOnPrecursor determines whether or not non-existence of
+ // a dependent object is an error. For regular imports, it is.
+ // for now, the only thing this affects is whether or not the
+ // db exists.
+ boolean waitOnPrecursor = false;
for (int i = 1; i < ast.getChildCount(); ++i){
ASTNode child = (ASTNode) ast.getChild(i);
@@ -133,7 +135,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// parsing statement is now done, on to logic.
tableExists = prepareImport(
- isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb,
+ isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx));
@@ -168,7 +170,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
public static boolean prepareImport(
- boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb,
+ boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor,
String parsedLocation, String parsedTableName, String parsedDbName,
LinkedHashMap<String, String> parsedPartSpec,
String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
@@ -281,7 +283,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
createReplImportTasks(
tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, waitOnCreateDb, table,
+ isPartSpecSet, replicationSpec, waitOnPrecursor, table,
fromURI, fs, wh, x);
}
return tableExists;
@@ -799,7 +801,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static void createReplImportTasks(
CreateTableDesc tblDesc,
List<AddPartitionDesc> partitionDescs,
- boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+ boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor,
Table table, URI fromURI, FileSystem fs, Warehouse wh,
EximUtil.SemanticAnalyzerWrapperContext x)
throws HiveException, URISyntaxException, IOException, MetaException {
@@ -829,12 +831,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// defaults and do not error out in that case.
Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
if (parentDb == null){
- if (!waitOnCreateDb){
+ if (!waitOnPrecursor){
throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
}
}
if (tblDesc.getLocation() == null) {
- if (!waitOnCreateDb){
+ if (!waitOnPrecursor){
tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString());
} else {
tblDesc.setLocation(
http://git-wip-us.apache.org/repos/asf/hive/blob/268c60eb/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 6fff98d..8725015 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.io.IOUtils;
@@ -205,7 +206,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1);
- batchSize = 15;
if (batchSize == null){
batchSize = maxRange;
} else {
@@ -478,7 +478,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
// not an event dump, and table name pattern specified, this has to be a tbl-level dump
- analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
+ rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null));
return;
}
@@ -512,13 +512,45 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
} else {
// event dump, each subdir is an individual event dump.
+ Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf);
+ Task<? extends Serializable> taskChainTail = evTaskRoot;
+ int evstage = 0;
for (FileStatus dir : dirsInLoadPath){
+ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
// event loads will behave similar to table loads, with one crucial difference
// precursor order is strict, and each event must be processed after the previous one.
- LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
- analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), null);
- // FIXME: we should have a strict order of execution so that each event's tasks occur linearly
+ // The way we handle this strict order is as follows:
+ // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask)
+ // at the head of our event chain. For each event we process, we tell analyzeTableLoad to
+ // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks
+ // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all
+ // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of
+ // tasks as follows:
+ //
+ // --->ev1.task1-- --->ev2.task1--
+ // / \ / \
+ // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
+ // \ /
+ // --->ev1.task3--
+ //
+ List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
+ dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail);
+ LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0);
+ if ((evTasks != null) && (!evTasks.isEmpty())){
+ Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
+ for (Task<? extends Serializable> t : evTasks){
+ t.addDependentTask(barrierTask);
+ LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
+ t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
+ }
+ LOG.debug("Updated taskChainTail from {}{} to {}{}",
+ taskChainTail.getClass(),taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
+ taskChainTail = barrierTask;
+ evstage++;
+ }
}
+ LOG.debug("added evTaskRoot {}:{}",evTaskRoot.getClass(),evTaskRoot.getId());
+ rootTasks.add(evTaskRoot);
}
} catch (Exception e) {
@@ -528,9 +560,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
- private void analyzeEventLoad(String dbNameOrPattern, String tblNameOrPattern,
- FileSystem fs, FileStatus dir) throws SemanticException {
-
+ private List<Task<? extends Serializable>> analyzeEventLoad(
+ String dbName, String tblName, String locn,
+ Task<? extends Serializable> precursor ) throws SemanticException {
+ // Currently handles only create-tbl & insert-ptn, since only those are dumped
+ // As we add more event types, this will expand.
+ return analyzeTableLoad(dbName, tblName, locn, precursor);
}
private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
@@ -581,14 +616,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
for (FileStatus tableDir : dirsInDbPath) {
- analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+ analyzeTableLoad(
+ dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
}
} catch (Exception e) {
throw new SemanticException(e);
}
}
- private void analyzeTableLoad(String dbName, String tblName, String locn,
+ private List<Task<? extends Serializable>> analyzeTableLoad(
+ String dbName, String tblName, String locn,
Task<? extends Serializable> precursor) throws SemanticException {
// Path being passed to us is a table dump location. We go ahead and load it in as needed.
// If tblName is null, then we default to the table name specified in _metadata, which is good.
@@ -607,27 +644,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
LinkedHashMap<String, String> parsedPartSpec = null;
// no location for repl imports
String parsedLocation = null;
- boolean waitOnCreateDb = false;
- List<Task<? extends Serializable>> importTasks = null;
- if (precursor == null) {
- importTasks = rootTasks;
- waitOnCreateDb = false;
- } else {
- importTasks = new ArrayList<Task<? extends Serializable>>();
- waitOnCreateDb = true;
- }
+ List<Task<? extends Serializable>> importTasks = new ArrayList<Task<? extends Serializable>>();
+
EximUtil.SemanticAnalyzerWrapperContext x =
new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG,
ctx);
ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
- waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
+ (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
if (precursor != null) {
for (Task<? extends Serializable> t : importTasks) {
precursor.addDependentTask(t);
+ LOG.debug("Added {}:{} as a precursor of {}:{}",
+ precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
}
}
+ return importTasks;
} catch (Exception e) {
throw new SemanticException(e);
}
@@ -655,8 +688,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if (tbl != null) {
inputs.add(new ReadEntity(tbl));
Map<String, String> params = tbl.getParameters();
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
- replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+ replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
}
}
} else {
@@ -665,8 +698,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if (database != null) {
inputs.add(new ReadEntity(database));
Map<String, String> params = database.getParameters();
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
- replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+ replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
}
}
}
@@ -675,9 +708,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// codes
}
- LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to "
- + ctx.getResFile());
prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}" ,
+ String.valueOf(replLastId),ctx.getResFile());
}
private void prepareReturnValues(List<String> values, String schema) throws SemanticException {