You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/05/09 06:14:28 UTC
hive git commit: HIVE-16530 : Add HS2 operation logs and improve logs
for REPL commands (Sankar Hariappan via Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master db7b57cd9 -> d6db6ffff
HIVE-16530 : Add HS2 operation logs and improve logs for REPL commands (Sankar Hariappan via Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d6db6fff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d6db6fff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d6db6fff
Branch: refs/heads/master
Commit: d6db6ffff630bf377b157e9e971d48e1a6d10209
Parents: db7b57c
Author: Sankar Hariappan <ma...@gmail.com>
Authored: Mon May 8 23:14:19 2017 -0700
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Mon May 8 23:14:19 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/log/LogDivertAppender.java | 2 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 49 ++++++++++++++++++--
2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d6db6fff/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
index 64ce100..e697b54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
@@ -78,7 +78,7 @@ public class LogDivertAppender {
*/
private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|").
join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter",
- "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
+ "org.apache.hadoop.mapreduce.Job", "SessionState", "ReplState", Task.class.getName(),
Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}));
/* Patterns that are included in performance logging level.
http://git-wip-us.apache.org/repos/asf/hive/blob/d6db6fff/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 5d1d2fd..fdcaa7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -99,6 +99,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
+ private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+ private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+ private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
@@ -186,6 +189,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// bootstrap case
Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
for (String dbName : matchesDb(dbNameOrPattern)) {
+ REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName);
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
dumpFunctionMetadata(dbName, dumpRoot);
@@ -194,6 +198,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
+ " to db root " + dbRoot.toUri());
dumpTbl(ast, dbName, tblName, dbRoot);
}
+ REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " +
+ "metadata and data",
+ dbName, rootTasks.size());
}
Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId);
@@ -258,6 +265,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
evFetcher, eventFrom, maxEventLimit, evFilter);
lastReplId = eventTo;
+ REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL",
+ (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
while (evIter.hasNext()){
NotificationEvent ev = evIter.next();
lastReplId = ev.getEventId();
@@ -265,6 +274,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
dumpEvent(ev, evRoot, cmRoot);
}
+ REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}",
+ (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
+
LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
Utils.writeOutput(
Arrays.asList(
@@ -294,6 +306,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
getNewEventOnlyReplicationSpec(ev.getEventId())
);
EventHandlerFactory.handlerFor(ev).handle(context);
+ REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}",
+ String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString());
}
public static void injectNextDumpDirForTest(String dumpdir){
@@ -333,6 +347,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
HiveWrapper.Tuple<Database> database = new HiveWrapper(db, dbName).database();
EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
+ REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e);
@@ -340,10 +355,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
return dbRoot;
}
- private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
- private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
- private final static Logger SESSION_STATE_LOG = LoggerFactory.getLogger("SessionState");
-
private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException {
Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
try {
@@ -360,7 +371,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
continue;
}
if (tuple.object.getResourceUris().isEmpty()) {
- SESSION_STATE_LOG.warn(
+ REPL_STATE_LOG.warn(
"Not replicating function: " + functionName + " as it seems to have been created "
+ "without USING clause");
continue;
@@ -372,6 +383,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
functionMetadataRoot)) {
new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec);
}
+ REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
}
} catch (Exception e) {
throw new SemanticException(e);
@@ -392,8 +404,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
try {
URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+
ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx,
rootTasks, inputs, outputs, LOG);
+ REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " +
+ "and data to path {}", dbName, tblName, toURI.toString());
} catch (HiveException e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e);
@@ -540,10 +555,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
Task<? extends Serializable> taskChainTail = evTaskRoot;
int evstage = 0;
+ int evIter = 0;
Long lastEvid = null;
Map<String,Long> dbsUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
Map<String,Long> tablesUpdated = new ReplicationSpec.ReplStateMap<String,Long>();
+ REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL",
+ (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
+ loadPath.toUri().toString());
for (FileStatus dir : dirsInLoadPath){
LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
// event loads will behave similar to table loads, with one crucial difference
@@ -570,6 +589,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
dbNameOrPattern, tblNameOrPattern, locn, taskChainTail,
dbsUpdated, tablesUpdated, eventDmd);
+ evIter++;
+ REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " +
+ "with ID: {}, Type: {}, Path: {}",
+ evIter, dirsInLoadPath.length,
+ dir.getPath().getName(), eventDmd.getDumpType().toString(), locn);
+
LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0);
if ((evTasks != null) && (!evTasks.isEmpty())){
Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
@@ -657,6 +682,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
taskChainTail = updateReplIdTask;
}
rootTasks.add(evTaskRoot);
+ REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " +
+ "(DDL/COPY/MOVE) tasks",
+ (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
+ loadPath.toUri().toString());
}
} catch (Exception e) {
@@ -742,6 +771,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
dbName = dbObj.getName();
}
+ REPL_STATE_LOG.info("Repl Load: Started analyzing Repl Load for DB: {} from Dump Dir: {}, Dump Type: BOOTSTRAP",
+ dbName, dir.getPath().toUri().toString());
+
Task<? extends Serializable> dbRootTask = null;
if (existEmptyDb(dbName)) {
AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters());
@@ -766,6 +798,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) {
analyzeTableLoad(
dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null);
+ REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load from path {}",
+ tableDir.getPath().toUri().toString());
}
//Function load
@@ -775,8 +809,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs)));
for (FileStatus functionDir : functionDirectories) {
analyzeFunctionLoad(dbName, functionDir, dbRootTask);
+ REPL_STATE_LOG.info("Repl Load: Analyzed function load from path {}",
+ functionDir.getPath().toUri().toString());
}
}
+
+ REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl Load for DB: {} and created import (DDL/COPY/MOVE) tasks",
+ dbName);
} catch (Exception e) {
throw new SemanticException(e);
}