You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by gn...@apache.org on 2017/02/01 07:07:39 UTC
ambari git commit: AMBARI-19780. Hive View : Logs are not complete
for hive view. (gauravn7)
Repository: ambari
Updated Branches:
refs/heads/trunk 01f4a69e2 -> 0dfe8b6c9
AMBARI-19780. Hive View : Logs are not complete for hive view. (gauravn7)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0dfe8b6c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0dfe8b6c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0dfe8b6c
Branch: refs/heads/trunk
Commit: 0dfe8b6c9792aa2d40d8ecaa75f0256e9820eb89
Parents: 01f4a69
Author: Gaurav Nagar <gr...@gmail.com>
Authored: Wed Feb 1 12:37:21 2017 +0530
Committer: Gaurav Nagar <gr...@gmail.com>
Committed: Wed Feb 1 12:37:21 2017 +0530
----------------------------------------------------------------------
.../ambari/view/hive2/actor/LogAggregator.java | 18 ++++++++++++------
.../view/hive2/actor/StatementExecutor.java | 9 ++++++---
.../hive2/actor/message/StartLogAggregation.java | 10 +++++++++-
.../ambari/view/hive20/actor/LogAggregator.java | 18 ++++++++++++------
.../view/hive20/actor/StatementExecutor.java | 10 +++++++---
.../hive20/actor/message/StartLogAggregation.java | 10 +++++++++-
6 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
index 9412f81..69b4a56 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -29,6 +29,7 @@ import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
import org.apache.ambari.view.utils.hdfs.HdfsApi;
import org.apache.ambari.view.utils.hdfs.HdfsApiException;
import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.lang.StringUtils;
import org.apache.hive.jdbc.HiveStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,17 +48,17 @@ public class LogAggregator extends HiveActor {
public static final int AGGREGATION_INTERVAL = 5 * 1000;
private final HdfsApi hdfsApi;
- private final HiveStatement statement;
+ private HiveStatement statement;
private final String logFile;
private Cancellable moreLogsScheduler;
private ActorRef parent;
private boolean hasStartedFetching = false;
private boolean shouldFetchMore = true;
+ private String allLogs = "";
- public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+ public LogAggregator(HdfsApi hdfsApi, String logFile) {
this.hdfsApi = hdfsApi;
- this.statement = statement;
this.logFile = logFile;
}
@@ -65,7 +66,7 @@ public class LogAggregator extends HiveActor {
public void handleMessage(HiveMessage hiveMessage) {
Object message = hiveMessage.getMessage();
if (message instanceof StartLogAggregation) {
- start();
+ start((StartLogAggregation) message);
}
if (message instanceof GetMoreLogs) {
@@ -79,10 +80,15 @@ public class LogAggregator extends HiveActor {
}
}
- private void start() {
+ private void start(StartLogAggregation message) {
+ this.statement = message.getHiveStatement();
parent = this.getSender();
hasStartedFetching = false;
shouldFetchMore = true;
+ String logTitle = "Logs for Query '" + message.getStatement() + "'";
+ String repeatSeperator = StringUtils.repeat("=", logTitle.length());
+ allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator);
+
if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
moreLogsScheduler.cancel();
}
@@ -94,7 +100,7 @@ public class LogAggregator extends HiveActor {
private void getMoreLogs() throws SQLException, HdfsApiException {
List<String> logs = statement.getQueryLog();
if (logs.size() > 0 && shouldFetchMore) {
- String allLogs = Joiner.on("\n").skipNulls().join(logs);
+ allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
if(!statement.hasMoreLogs()) {
shouldFetchMore = false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
index d7b4f54..6cdee81 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
@@ -96,7 +96,6 @@ public class StatementExecutor extends HiveActor {
LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e);
sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self());
} finally {
- stopLogAggregation();
stopGUIDFetch();
}
}
@@ -120,11 +119,11 @@ public class StatementExecutor extends HiveActor {
private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) {
if (logAggregator == null) {
logAggregator = getContext().actorOf(
- Props.create(LogAggregator.class, hdfsApi, statement, logFile)
+ Props.create(LogAggregator.class, hdfsApi, logFile)
.withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString());
}
LOG.info("Fetching query logs for statement: {}", sqlStatement);
- logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf());
+ logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf());
}
private void stopLogAggregation() {
@@ -134,6 +133,10 @@ public class StatementExecutor extends HiveActor {
logAggregator = null;
}
+ @Override
+ public void postStop() throws Exception {
+ stopLogAggregation();
+ }
private void getColumnMetaData(GetColumnMetadataJob message) {
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
index b56da08..48fbced 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
@@ -18,14 +18,22 @@
package org.apache.ambari.view.hive2.actor.message;
+import org.apache.hive.jdbc.HiveStatement;
+
public class StartLogAggregation {
private String statement;
+ private HiveStatement hiveStatement;
public StartLogAggregation() {
}
- public StartLogAggregation(String statement) {
+ public StartLogAggregation(String statement, HiveStatement hiveStatement) {
this.statement = statement;
+ this.hiveStatement = hiveStatement;
+ }
+
+ public HiveStatement getHiveStatement() {
+ return hiveStatement;
}
public String getStatement() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
index f9c21b4..600ea64 100644
--- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
@@ -28,6 +28,7 @@ import org.apache.ambari.view.hive20.actor.message.StartLogAggregation;
import org.apache.ambari.view.utils.hdfs.HdfsApi;
import org.apache.ambari.view.utils.hdfs.HdfsApiException;
import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hive.jdbc.HiveStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,17 +47,17 @@ public class LogAggregator extends HiveActor {
public static final int AGGREGATION_INTERVAL = 5 * 1000;
private final HdfsApi hdfsApi;
- private final HiveStatement statement;
+ private HiveStatement statement;
private final String logFile;
private Cancellable moreLogsScheduler;
private ActorRef parent;
private boolean hasStartedFetching = false;
private boolean shouldFetchMore = true;
+ private String allLogs = "";
- public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+ public LogAggregator(HdfsApi hdfsApi, String logFile) {
this.hdfsApi = hdfsApi;
- this.statement = statement;
this.logFile = logFile;
}
@@ -64,7 +65,7 @@ public class LogAggregator extends HiveActor {
public void handleMessage(HiveMessage hiveMessage) {
Object message = hiveMessage.getMessage();
if (message instanceof StartLogAggregation) {
- start();
+ start((StartLogAggregation) message);
}
if (message instanceof GetMoreLogs) {
@@ -79,10 +80,15 @@ public class LogAggregator extends HiveActor {
}
}
- private void start() {
+ private void start(StartLogAggregation message) {
+ this.statement = message.getHiveStatement();
parent = this.getSender();
hasStartedFetching = false;
shouldFetchMore = true;
+ String logTitle = "Logs for Query '" + message.getStatement() + "'";
+ String repeatSeperator = StringUtils.repeat("=", logTitle.length());
+ allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator);
+
if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
moreLogsScheduler.cancel();
}
@@ -94,7 +100,7 @@ public class LogAggregator extends HiveActor {
private void getMoreLogs() throws SQLException, HdfsApiException {
List<String> logs = statement.getQueryLog();
if (logs.size() > 0 && shouldFetchMore) {
- String allLogs = Joiner.on("\n").skipNulls().join(logs);
+ allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
if(!statement.hasMoreLogs()) {
shouldFetchMore = false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
index 03332d9..c3ed14b 100644
--- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
@@ -96,7 +96,6 @@ public class StatementExecutor extends HiveActor {
LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e);
sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self());
} finally {
- stopLogAggregation();
stopGUIDFetch();
}
}
@@ -120,11 +119,11 @@ public class StatementExecutor extends HiveActor {
private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) {
if (logAggregator == null) {
logAggregator = getContext().actorOf(
- Props.create(LogAggregator.class, hdfsApi, statement, logFile)
+ Props.create(LogAggregator.class, hdfsApi, logFile)
.withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString());
}
LOG.info("Fetching query logs for statement: {}", sqlStatement);
- logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf());
+ logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf());
}
private void stopLogAggregation() {
@@ -134,6 +133,11 @@ public class StatementExecutor extends HiveActor {
logAggregator = null;
}
+ @Override
+ public void postStop() throws Exception {
+ stopLogAggregation();
+ }
+
private void getColumnMetaData(GetColumnMetadataJob message) {
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
index 922ad1d..8aab04f 100644
--- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
@@ -18,14 +18,22 @@
package org.apache.ambari.view.hive20.actor.message;
+import org.apache.hive.jdbc.HiveStatement;
+
public class StartLogAggregation {
private String statement;
+ private HiveStatement hiveStatement;
public StartLogAggregation() {
}
- public StartLogAggregation(String statement) {
+ public StartLogAggregation(String statement, HiveStatement hiveStatement) {
this.statement = statement;
+ this.hiveStatement = hiveStatement;
+ }
+
+ public HiveStatement getHiveStatement() {
+ return hiveStatement;
}
public String getStatement() {