You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2016/12/19 23:08:39 UTC

hive git commit: HIVE-15426 : Fix order guarantee of event executions for REPL LOAD (Sushanth Sowmyan, reviewed by Thejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master bbd99ed60 -> 268c60ebb


HIVE-15426 : Fix order guarantee of event executions for REPL LOAD (Sushanth Sowmyan, reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/268c60eb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/268c60eb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/268c60eb

Branch: refs/heads/master
Commit: 268c60ebb22691f5410223af2aafcd06ea6a2273
Parents: bbd99ed
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Mon Dec 19 15:07:53 2016 -0800
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Mon Dec 19 15:08:35 2016 -0800

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 51 +++++++++---
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 20 ++---
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 85 ++++++++++++++------
 3 files changed, 111 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/268c60eb/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 9b7014b..3ac5ba7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -163,7 +163,13 @@ public class TestReplicationScenarios {
     advanceDumpDir();
     run("REPL DUMP " + dbName);
     String replDumpLocn = getResult(0,0);
-    run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'");
+    String replDumpId = getResult(0,1,true);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
 
     run("SELECT * from " + dbName + "_dupe.unptned");
     verifyResults(unptn_data);
@@ -230,7 +236,6 @@ public class TestReplicationScenarios {
     run("SELECT a from " + dbName + ".ptned WHERE b=2");
     verifyResults(ptn_data_2);
 
-    // verified up to here.
     run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
     run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)");
     run("SELECT a from " + dbName + ".ptned_late WHERE b=1");
@@ -244,19 +249,28 @@ public class TestReplicationScenarios {
     String incrementalDumpLocn = getResult(0,0);
     String incrementalDumpId = getResult(0,1,true);
     LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
 
+    run("REPL STATUS " + dbName + "_dupe");
+//    verifyResults(new String[] {incrementalDumpId});
+    // TODO: this will currently not work because we need to add in ALTER_DB support into this
+    // and queue in a dummy ALTER_DB to update the repl.last.id on the last event of every
+    // incremental dump. Currently, the dump id fetched will be the last dump id at the time
+    // the db was created from the bootstrap export dump
+
     run("SELECT * from " + dbName + "_dupe.unptned_empty");
     verifyResults(empty);
     run("SELECT a from " + dbName + ".ptned_empty");
     verifyResults(empty);
 
 
-//  this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
-//  to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
-//  fixing that.
 //    run("SELECT * from " + dbName + "_dupe.unptned");
 //    verifyResults(unptn_data);
+    // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
+    // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
+    // fixing that.
     run("SELECT * from " + dbName + "_dupe.unptned_late");
     verifyResults(unptn_data);
 
@@ -291,6 +305,16 @@ public class TestReplicationScenarios {
   }
 
   private void verifyResults(String[] data) throws IOException {
+    List<String> results = getOutput();
+    LOG.info("Expecting {}",data);
+    LOG.info("Got {}",results);
+    assertEquals(data.length,results.size());
+    for (int i = 0; i < data.length; i++){
+      assertEquals(data[i],results.get(i));
+    }
+  }
+
+  private List<String> getOutput() throws IOException {
     List<String> results = new ArrayList<String>();
     try {
       driver.getResults(results);
@@ -298,16 +322,23 @@ public class TestReplicationScenarios {
       LOG.warn(e.getMessage(),e);
       throw new RuntimeException(e);
     }
-    LOG.info("Expecting {}",data);
-    LOG.info("Got {}",results);
-    assertEquals(data.length,results.size());
-    for (int i = 0; i < data.length; i++){
-      assertEquals(data[i],results.get(i));
+    return results;
+  }
+
+  private void printOutput() throws IOException {
+    for (String s : getOutput()){
+      LOG.info(s);
     }
   }
 
   private static void run(String cmd) throws RuntimeException {
+    try {
     run(cmd,false); // default arg-less run simply runs, and does not care about failure
+    } catch (AssertionError ae){
+      // Hive code has AssertionErrors in some cases - we want to record what happens
+      LOG.warn("AssertionError:",ae);
+      throw new RuntimeException(ae);
+    }
   }
 
   private static boolean run(String cmd, boolean errorOnFail) throws RuntimeException {

http://git-wip-us.apache.org/repos/asf/hive/blob/268c60eb/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index ce952c5..5561e06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -102,9 +102,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       String parsedDbName = null;
       LinkedHashMap<String, String> parsedPartSpec = new LinkedHashMap<String, String>();
 
-      // waitOnCreateDb determines whether or not non-existence of
-      // db is an error. For regular imports, it is.
-      boolean waitOnCreateDb = false;
+      // waitOnPrecursor determines whether or not non-existence of
+      // a dependent object is an error. For regular imports, it is.
+      // for now, the only thing this affects is whether or not the
+      // db exists.
+      boolean waitOnPrecursor = false;
 
       for (int i = 1; i < ast.getChildCount(); ++i){
         ASTNode child = (ASTNode) ast.getChild(i);
@@ -133,7 +135,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       // parsing statement is now done, on to logic.
       tableExists = prepareImport(
-          isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb,
+          isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
           parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
           new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx));
 
@@ -168,7 +170,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   public static boolean prepareImport(
-      boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb,
+      boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor,
       String parsedLocation, String parsedTableName, String parsedDbName,
       LinkedHashMap<String, String> parsedPartSpec,
       String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
@@ -281,7 +283,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     } else {
       createReplImportTasks(
           tblDesc, partitionDescs,
-          isPartSpecSet, replicationSpec, waitOnCreateDb, table,
+          isPartSpecSet, replicationSpec, waitOnPrecursor, table,
           fromURI, fs, wh, x);
     }
     return tableExists;
@@ -799,7 +801,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static void createReplImportTasks(
       CreateTableDesc tblDesc,
       List<AddPartitionDesc> partitionDescs,
-      boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+      boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor,
       Table table, URI fromURI, FileSystem fs, Warehouse wh,
       EximUtil.SemanticAnalyzerWrapperContext x)
       throws HiveException, URISyntaxException, IOException, MetaException {
@@ -829,12 +831,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     // defaults and do not error out in that case.
     Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
     if (parentDb == null){
-      if (!waitOnCreateDb){
+      if (!waitOnPrecursor){
         throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
       }
     }
     if (tblDesc.getLocation() == null) {
-      if (!waitOnCreateDb){
+      if (!waitOnPrecursor){
         tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString());
       } else {
         tblDesc.setLocation(

http://git-wip-us.apache.org/repos/asf/hive/blob/268c60eb/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 6fff98d..8725015 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.io.IOUtils;
 
@@ -205,7 +206,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
 
         Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1);
-        batchSize = 15;
         if (batchSize == null){
           batchSize = maxRange;
         } else {
@@ -478,7 +478,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
         // not an event dump, and table name pattern specified, this has to be a tbl-level dump
-        analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
+        rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null));
         return;
       }
 
@@ -512,13 +512,45 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
       } else {
         // event dump, each subdir is an individual event dump.
+        Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf);
+        Task<? extends Serializable> taskChainTail = evTaskRoot;
+        int evstage = 0;
         for (FileStatus dir : dirsInLoadPath){
+          LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
           // event loads will behave similar to table loads, with one crucial difference
           // precursor order is strict, and each event must be processed after the previous one.
-          LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
-          analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), null);
-          // FIXME: we should have a strict order of execution so that each event's tasks occur linearly
+          // The way we handle this strict order is as follows:
+          // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask)
+          // at the head of our event chain. For each event we process, we tell analyzeTableLoad to
+          // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks
+          // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all
+          // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of
+          // tasks as follows:
+          //
+          //                 --->ev1.task1--                          --->ev2.task1--
+          //                /               \                        /               \
+          //  evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
+          //                \               /
+          //                 --->ev1.task3--
+          //
+          List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
+              dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail);
+          LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0);
+          if ((evTasks != null) && (!evTasks.isEmpty())){
+            Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
+            for (Task<? extends Serializable> t : evTasks){
+              t.addDependentTask(barrierTask);
+              LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
+                  t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
+            }
+            LOG.debug("Updated taskChainTail from {}{} to {}{}",
+                taskChainTail.getClass(),taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
+            taskChainTail = barrierTask;
+            evstage++;
+          }
         }
+        LOG.debug("added evTaskRoot {}:{}",evTaskRoot.getClass(),evTaskRoot.getId());
+        rootTasks.add(evTaskRoot);
       }
 
     } catch (Exception e) {
@@ -528,9 +560,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   }
 
-  private void analyzeEventLoad(String dbNameOrPattern, String tblNameOrPattern,
-      FileSystem fs, FileStatus dir) throws SemanticException {
-
+  private List<Task<? extends Serializable>> analyzeEventLoad(
+      String dbName, String tblName, String locn,
+      Task<? extends  Serializable> precursor ) throws SemanticException {
+    // Currently handles only create-tbl & insert-ptn, since only those are dumped
+    // As we add more event types, this will expand.
+    return analyzeTableLoad(dbName, tblName, locn, precursor);
   }
 
   private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
@@ -581,14 +616,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
 
       for (FileStatus tableDir : dirsInDbPath) {
-        analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+        analyzeTableLoad(
+            dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
       }
     } catch (Exception e) {
       throw new SemanticException(e);
     }
   }
 
-  private void analyzeTableLoad(String dbName, String tblName, String locn,
+  private List<Task<? extends Serializable>> analyzeTableLoad(
+      String dbName, String tblName, String locn,
       Task<? extends Serializable> precursor) throws SemanticException {
     // Path being passed to us is a table dump location. We go ahead and load it in as needed.
     // If tblName is null, then we default to the table name specified in _metadata, which is good.
@@ -607,27 +644,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       LinkedHashMap<String, String> parsedPartSpec = null;
       // no location for repl imports
       String parsedLocation = null;
-      boolean waitOnCreateDb = false;
-      List<Task<? extends Serializable>> importTasks = null;
-      if (precursor == null) {
-        importTasks = rootTasks;
-        waitOnCreateDb = false;
-      } else {
-        importTasks = new ArrayList<Task<? extends Serializable>>();
-        waitOnCreateDb = true;
-      }
+      List<Task<? extends Serializable>> importTasks = new ArrayList<Task<? extends Serializable>>();
+
       EximUtil.SemanticAnalyzerWrapperContext x =
           new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG,
               ctx);
       ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
-          waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
+          (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
 
       if (precursor != null) {
         for (Task<? extends Serializable> t : importTasks) {
           precursor.addDependentTask(t);
+          LOG.debug("Added {}:{} as a precursor of {}:{}",
+              precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
         }
       }
 
+      return importTasks;
     } catch (Exception e) {
       throw new SemanticException(e);
     }
@@ -655,8 +688,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         if (tbl != null) {
           inputs.add(new ReadEntity(tbl));
           Map<String, String> params = tbl.getParameters();
-          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
-            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
           }
         }
       } else {
@@ -665,8 +698,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         if (database != null) {
           inputs.add(new ReadEntity(database));
           Map<String, String> params = database.getParameters();
-          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
-            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+          if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+            replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
           }
         }
       }
@@ -675,9 +708,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
                                       // codes
     }
 
-    LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to "
-        + ctx.getResFile());
     prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}" ,
+        String.valueOf(replLastId),ctx.getResFile());
   }
 
   private void prepareReturnValues(List<String> values, String schema) throws SemanticException {