You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/05/09 06:14:28 UTC

hive git commit: HIVE-16530 : Add HS2 operation logs and improve logs for REPL commands (Sankar Hariappan via Thejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master db7b57cd9 -> d6db6ffff


HIVE-16530 : Add HS2 operation logs and improve logs for REPL commands (Sankar Hariappan via Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d6db6fff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d6db6fff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d6db6fff

Branch: refs/heads/master
Commit: d6db6ffff630bf377b157e9e971d48e1a6d10209
Parents: db7b57c
Author: Sankar Hariappan <ma...@gmail.com>
Authored: Mon May 8 23:14:19 2017 -0700
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Mon May 8 23:14:19 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/log/LogDivertAppender.java   |  2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 49 ++++++++++++++++++--
 2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d6db6fff/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
index 64ce100..e697b54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
@@ -78,7 +78,7 @@ public class LogDivertAppender {
      */
     private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|").
         join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter",
-            "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
+            "org.apache.hadoop.mapreduce.Job", "SessionState", "ReplState", Task.class.getName(),
             Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}));
 
     /* Patterns that are included in performance logging level.

http://git-wip-us.apache.org/repos/asf/hive/blob/d6db6fff/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 5d1d2fd..fdcaa7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -99,6 +99,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
+  private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+  private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+  private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
 
   ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
@@ -186,6 +189,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         // bootstrap case
         Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
         for (String dbName : matchesDb(dbNameOrPattern)) {
+          REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName);
           LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
           Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
           dumpFunctionMetadata(dbName, dumpRoot);
@@ -194,6 +198,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
                 + " to db root " + dbRoot.toUri());
             dumpTbl(ast, dbName, tblName, dbRoot);
           }
+          REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " +
+                              "metadata and data",
+                              dbName, rootTasks.size());
         }
         Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
         LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId);
@@ -258,6 +265,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
             evFetcher, eventFrom, maxEventLimit, evFilter);
 
         lastReplId = eventTo;
+        REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL",
+                            (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
         while (evIter.hasNext()){
           NotificationEvent ev = evIter.next();
           lastReplId = ev.getEventId();
@@ -265,6 +274,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           dumpEvent(ev, evRoot, cmRoot);
         }
 
+        REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}",
+                            (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
+
         LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
         Utils.writeOutput(
             Arrays.asList(
@@ -294,6 +306,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         getNewEventOnlyReplicationSpec(ev.getEventId())
     );
     EventHandlerFactory.handlerFor(ev).handle(context);
+    REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}",
+                        String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString());
   }
 
   public static void injectNextDumpDirForTest(String dumpdir){
@@ -333,6 +347,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
       HiveWrapper.Tuple<Database> database = new HiveWrapper(db, dbName).database();
       EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
+      REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e);
@@ -340,10 +355,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     return dbRoot;
   }
 
-  private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
-  private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
-  private final static Logger SESSION_STATE_LOG = LoggerFactory.getLogger("SessionState");
-
   private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException {
     Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
     try {
@@ -360,7 +371,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           continue;
         }
         if (tuple.object.getResourceUris().isEmpty()) {
-          SESSION_STATE_LOG.warn(
+          REPL_STATE_LOG.warn(
               "Not replicating function: " + functionName + " as it seems to have been created "
                   + "without USING clause");
           continue;
@@ -372,6 +383,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
             functionMetadataRoot)) {
           new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec);
         }
+        REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
       }
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -392,8 +404,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     try {
       URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
       TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+
       ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx,
           rootTasks, inputs, outputs, LOG);
+      REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " +
+                          "and data to path {}", dbName, tblName, toURI.toString());
     } catch (HiveException e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e);
@@ -540,10 +555,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         Task<? extends Serializable> taskChainTail = evTaskRoot;
 
         int evstage = 0;
+        int evIter = 0;
         Long lastEvid = null;
         Map<String,Long> dbsUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
         Map<String,Long> tablesUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
 
+        REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL",
+                (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
+                loadPath.toUri().toString());
         for (FileStatus dir : dirsInLoadPath){
           LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
           // event loads will behave similar to table loads, with one crucial difference
@@ -570,6 +589,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
               dbNameOrPattern, tblNameOrPattern, locn, taskChainTail,
               dbsUpdated, tablesUpdated, eventDmd);
+          evIter++;
+          REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " +
+                              "with ID: {}, Type: {}, Path: {}",
+                              evIter, dirsInLoadPath.length,
+                              dir.getPath().getName(), eventDmd.getDumpType().toString(), locn);
+
           LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0);
           if ((evTasks != null) && (!evTasks.isEmpty())){
             Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
@@ -657,6 +682,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           taskChainTail = updateReplIdTask;
         }
         rootTasks.add(evTaskRoot);
+        REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " +
+                            "(DDL/COPY/MOVE) tasks",
+                            (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
+                            loadPath.toUri().toString());
       }
 
     } catch (Exception e) {
@@ -742,6 +771,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         dbName = dbObj.getName();
       }
 
+      REPL_STATE_LOG.info("Repl Load: Started analyzing Repl Load for DB: {} from Dump Dir: {}, Dump Type: BOOTSTRAP",
+                          dbName, dir.getPath().toUri().toString());
+
       Task<? extends Serializable> dbRootTask = null;
       if (existEmptyDb(dbName)) {
         AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters());
@@ -766,6 +798,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) {
         analyzeTableLoad(
             dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null);
+        REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load from path {}",
+                            tableDir.getPath().toUri().toString());
       }
 
       //Function load
@@ -775,8 +809,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
             Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs)));
         for (FileStatus functionDir : functionDirectories) {
           analyzeFunctionLoad(dbName, functionDir, dbRootTask);
+          REPL_STATE_LOG.info("Repl Load: Analyzed function load from path {}",
+                              functionDir.getPath().toUri().toString());
         }
       }
+
+      REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl Load for DB: {} and created import (DDL/COPY/MOVE) tasks",
+              dbName);
     } catch (Exception e) {
       throw new SemanticException(e);
     }