You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/02/03 14:53:07 UTC

[03/50] [abbrv] ambari git commit: AMBARI-19780. Hive View : Logs are not complete for hive view. (gauravn7)

AMBARI-19780. Hive View : Logs are not complete for hive view. (gauravn7)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0dfe8b6c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0dfe8b6c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0dfe8b6c

Branch: refs/heads/branch-dev-patch-upgrade
Commit: 0dfe8b6c9792aa2d40d8ecaa75f0256e9820eb89
Parents: 01f4a69
Author: Gaurav Nagar <gr...@gmail.com>
Authored: Wed Feb 1 12:37:21 2017 +0530
Committer: Gaurav Nagar <gr...@gmail.com>
Committed: Wed Feb 1 12:37:21 2017 +0530

----------------------------------------------------------------------
 .../ambari/view/hive2/actor/LogAggregator.java    | 18 ++++++++++++------
 .../view/hive2/actor/StatementExecutor.java       |  9 ++++++---
 .../hive2/actor/message/StartLogAggregation.java  | 10 +++++++++-
 .../ambari/view/hive20/actor/LogAggregator.java   | 18 ++++++++++++------
 .../view/hive20/actor/StatementExecutor.java      | 10 +++++++---
 .../hive20/actor/message/StartLogAggregation.java | 10 +++++++++-
 6 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
index 9412f81..69b4a56 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -29,6 +29,7 @@ import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
 import org.apache.ambari.view.utils.hdfs.HdfsApi;
 import org.apache.ambari.view.utils.hdfs.HdfsApiException;
 import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hive.jdbc.HiveStatement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,17 +48,17 @@ public class LogAggregator extends HiveActor {
 
   public static final int AGGREGATION_INTERVAL = 5 * 1000;
   private final HdfsApi hdfsApi;
-  private final HiveStatement statement;
+  private HiveStatement statement;
   private final String logFile;
 
   private Cancellable moreLogsScheduler;
   private ActorRef parent;
   private boolean hasStartedFetching = false;
   private boolean shouldFetchMore = true;
+  private String allLogs = "";
 
-  public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+  public LogAggregator(HdfsApi hdfsApi, String logFile) {
     this.hdfsApi = hdfsApi;
-    this.statement = statement;
     this.logFile = logFile;
   }
 
@@ -65,7 +66,7 @@ public class LogAggregator extends HiveActor {
   public void handleMessage(HiveMessage hiveMessage) {
     Object message = hiveMessage.getMessage();
     if (message instanceof StartLogAggregation) {
-      start();
+      start((StartLogAggregation) message);
     }
 
     if (message instanceof GetMoreLogs) {
@@ -79,10 +80,15 @@ public class LogAggregator extends HiveActor {
     }
   }
 
-  private void start() {
+  private void start(StartLogAggregation message) {
+    this.statement = message.getHiveStatement();
     parent = this.getSender();
     hasStartedFetching = false;
     shouldFetchMore = true;
+    String logTitle = "Logs for Query '" + message.getStatement() + "'";
+    String repeatSeperator = StringUtils.repeat("=", logTitle.length());
+    allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator);
+
     if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
       moreLogsScheduler.cancel();
     }
@@ -94,7 +100,7 @@ public class LogAggregator extends HiveActor {
   private void getMoreLogs() throws SQLException, HdfsApiException {
     List<String> logs = statement.getQueryLog();
     if (logs.size() > 0 && shouldFetchMore) {
-      String allLogs = Joiner.on("\n").skipNulls().join(logs);
+      allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
       HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
       if(!statement.hasMoreLogs()) {
         shouldFetchMore = false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
index d7b4f54..6cdee81 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
@@ -96,7 +96,6 @@ public class StatementExecutor extends HiveActor {
       LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e);
       sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self());
     } finally {
-      stopLogAggregation();
       stopGUIDFetch();
     }
   }
@@ -120,11 +119,11 @@ public class StatementExecutor extends HiveActor {
   private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) {
     if (logAggregator == null) {
       logAggregator = getContext().actorOf(
-        Props.create(LogAggregator.class, hdfsApi, statement, logFile)
+        Props.create(LogAggregator.class, hdfsApi, logFile)
           .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString());
     }
     LOG.info("Fetching query logs for statement: {}", sqlStatement);
-    logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf());
+    logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf());
   }
 
   private void stopLogAggregation() {
@@ -134,6 +133,10 @@ public class StatementExecutor extends HiveActor {
     logAggregator = null;
   }
 
+  @Override
+  public void postStop() throws Exception {
+    stopLogAggregation();
+  }
 
   private void getColumnMetaData(GetColumnMetadataJob message) {
     try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
index b56da08..48fbced 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java
@@ -18,14 +18,22 @@
 
 package org.apache.ambari.view.hive2.actor.message;
 
+import org.apache.hive.jdbc.HiveStatement;
+
 public class StartLogAggregation {
   private String statement;
+  private HiveStatement hiveStatement;
 
   public StartLogAggregation() {
   }
 
-  public StartLogAggregation(String statement) {
+  public StartLogAggregation(String statement, HiveStatement hiveStatement) {
     this.statement = statement;
+    this.hiveStatement = hiveStatement;
+  }
+
+  public HiveStatement getHiveStatement() {
+    return hiveStatement;
   }
 
   public String getStatement() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
index f9c21b4..600ea64 100644
--- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java
@@ -28,6 +28,7 @@ import org.apache.ambari.view.hive20.actor.message.StartLogAggregation;
 import org.apache.ambari.view.utils.hdfs.HdfsApi;
 import org.apache.ambari.view.utils.hdfs.HdfsApiException;
 import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hive.jdbc.HiveStatement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,17 +47,17 @@ public class LogAggregator extends HiveActor {
 
   public static final int AGGREGATION_INTERVAL = 5 * 1000;
   private final HdfsApi hdfsApi;
-  private final HiveStatement statement;
+  private HiveStatement statement;
   private final String logFile;
 
   private Cancellable moreLogsScheduler;
   private ActorRef parent;
   private boolean hasStartedFetching = false;
   private boolean shouldFetchMore = true;
+  private String allLogs = "";
 
-  public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
+  public LogAggregator(HdfsApi hdfsApi, String logFile) {
     this.hdfsApi = hdfsApi;
-    this.statement = statement;
     this.logFile = logFile;
   }
 
@@ -64,7 +65,7 @@ public class LogAggregator extends HiveActor {
   public void handleMessage(HiveMessage hiveMessage) {
     Object message = hiveMessage.getMessage();
     if (message instanceof StartLogAggregation) {
-      start();
+      start((StartLogAggregation) message);
     }
 
     if (message instanceof GetMoreLogs) {
@@ -79,10 +80,15 @@ public class LogAggregator extends HiveActor {
     }
   }
 
-  private void start() {
+  private void start(StartLogAggregation message) {
+    this.statement = message.getHiveStatement();
     parent = this.getSender();
     hasStartedFetching = false;
     shouldFetchMore = true;
+    String logTitle = "Logs for Query '" + message.getStatement() + "'";
+    String repeatSeperator = StringUtils.repeat("=", logTitle.length());
+    allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator);
+
     if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
       moreLogsScheduler.cancel();
     }
@@ -94,7 +100,7 @@ public class LogAggregator extends HiveActor {
   private void getMoreLogs() throws SQLException, HdfsApiException {
     List<String> logs = statement.getQueryLog();
     if (logs.size() > 0 && shouldFetchMore) {
-      String allLogs = Joiner.on("\n").skipNulls().join(logs);
+      allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs);
       HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
       if(!statement.hasMoreLogs()) {
         shouldFetchMore = false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
index 03332d9..c3ed14b 100644
--- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java
@@ -96,7 +96,6 @@ public class StatementExecutor extends HiveActor {
       LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e);
       sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self());
     } finally {
-      stopLogAggregation();
       stopGUIDFetch();
     }
   }
@@ -120,11 +119,11 @@ public class StatementExecutor extends HiveActor {
   private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) {
     if (logAggregator == null) {
       logAggregator = getContext().actorOf(
-        Props.create(LogAggregator.class, hdfsApi, statement, logFile)
+        Props.create(LogAggregator.class, hdfsApi, logFile)
           .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString());
     }
     LOG.info("Fetching query logs for statement: {}", sqlStatement);
-    logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf());
+    logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf());
   }
 
   private void stopLogAggregation() {
@@ -134,6 +133,11 @@ public class StatementExecutor extends HiveActor {
     logAggregator = null;
   }
 
+  @Override
+  public void postStop() throws Exception {
+    stopLogAggregation();
+  }
+
 
   private void getColumnMetaData(GetColumnMetadataJob message) {
     try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
index 922ad1d..8aab04f 100644
--- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java
@@ -18,14 +18,22 @@
 
 package org.apache.ambari.view.hive20.actor.message;
 
+import org.apache.hive.jdbc.HiveStatement;
+
 public class StartLogAggregation {
   private String statement;
+  private HiveStatement hiveStatement;
 
   public StartLogAggregation() {
   }
 
-  public StartLogAggregation(String statement) {
+  public StartLogAggregation(String statement, HiveStatement hiveStatement) {
     this.statement = statement;
+    this.hiveStatement = hiveStatement;
+  }
+
+  public HiveStatement getHiveStatement() {
+    return hiveStatement;
   }
 
   public String getStatement() {