You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/07/08 22:03:33 UTC

[hive] branch master updated: HIVE-23277 : HiveProtoLogger should carry out JSON conversion in its own thread ( Attila Magyar via Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 58e532e  HIVE-23277 : HiveProtoLogger should carry out JSON conversion in its own thread ( Attila Magyar via Rajesh Balamohan)
58e532e is described below

commit 58e532e17fd5f8c0d6188756c1a9869b467dfaff
Author: Attila Magyar <am...@cloudera.com>
AuthorDate: Wed Jul 8 15:01:49 2020 -0700

    HIVE-23277 : HiveProtoLogger should carry out JSON conversion in its own thread ( Attila Magyar via Rajesh Balamohan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../apache/hadoop/hive/ql/exec/ExplainTask.java    | 20 ++++-
 .../ql/hooks/HiveHookEventProtoPartialBuilder.java | 86 ++++++++++++++++++++++
 .../hadoop/hive/ql/hooks/HiveProtoLoggingHook.java | 67 +++++++----------
 .../ql/optimizer/physical/StageIDsRearranger.java  |  8 +-
 .../TestHiveHookEventProtoPartialBuilder.java      | 82 +++++++++++++++++++++
 .../hive/ql/hooks/TestHiveProtoLoggingHook.java    |  1 +
 6 files changed, 216 insertions(+), 48 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 750abcb..f2ed01a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -244,9 +244,25 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
         work.getCboPlan(), work.getOptimizedSQL());
   }
 
+  public JSONObject getJSONPlan(PrintStream out, ExplainWork work, String stageIdRearrange)
+          throws Exception {
+    return getJSONPlan(out, work.getRootTasks(), work.getFetchTask(),
+            work.isFormatted(), work.getExtended(), work.isAppendTaskType(), work.getCboInfo(),
+            work.getCboPlan(), work.getOptimizedSQL(), stageIdRearrange);
+  }
+
+  public JSONObject getJSONPlan(PrintStream out, List<Task<?>> tasks, Task<?> fetchTask,
+                                boolean jsonOutput, boolean isExtended, boolean appendTaskType, String cboInfo,
+                                String cboPlan, String optimizedSQL) throws Exception {
+    return getJSONPlan(
+            out, tasks, fetchTask, jsonOutput, isExtended,
+            appendTaskType, cboInfo, cboPlan, optimizedSQL,
+            conf.getVar(ConfVars.HIVESTAGEIDREARRANGE));
+  }
+
   public JSONObject getJSONPlan(PrintStream out, List<Task<?>> tasks, Task<?> fetchTask,
       boolean jsonOutput, boolean isExtended, boolean appendTaskType, String cboInfo,
-      String cboPlan, String optimizedSQL) throws Exception {
+      String cboPlan, String optimizedSQL, String stageIdRearrange) throws Exception {
 
     // If the user asked for a formatted output, dump the json output
     // in the output stream
@@ -274,7 +290,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
       }
     }
 
-    List<Task> ordered = StageIDsRearranger.getExplainOrder(conf, tasks);
+    List<Task> ordered = StageIDsRearranger.getExplainOrder(tasks, stageIdRearrange);
 
     if (fetchTask != null) {
       fetchTask.setParentTasks((List)StageIDsRearranger.getFetchSources(tasks));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveHookEventProtoPartialBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveHookEventProtoPartialBuilder.java
new file mode 100644
index 0000000..9b9b4e1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveHookEventProtoPartialBuilder.java
@@ -0,0 +1,86 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType;
+import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveHookEventProtoPartialBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveHookEventProtoPartialBuilder.class.getName());
+  private final HiveHookEvents.HiveHookEventProto event;
+  @Nullable
+  private final ExplainWork explainWork;
+  private final Map<OtherInfoType, JSONObject> otherInfo;
+  private final String queryStr;
+  private final String stageIdRearrange;
+
+  public HiveHookEventProtoPartialBuilder(HiveHookEvents.HiveHookEventProto.Builder builder, ExplainWork explainWork, Map<OtherInfoType, JSONObject> otherInfo, String queryStr, String stageIdRearrange) {
+    this.event = builder.buildPartial();
+    this.explainWork = explainWork;
+    this.otherInfo = otherInfo;
+    this.queryStr = queryStr;
+    this.stageIdRearrange = stageIdRearrange;
+  }
+
+  public HiveHookEvents.HiveHookEventProto build() {
+    if (explainWork != null) {
+      addQueryObj(explainWork);
+    }
+    HiveHookEvents.HiveHookEventProto.Builder builder = HiveHookEvents.HiveHookEventProto.newBuilder();
+    for (Map.Entry<OtherInfoType, JSONObject> each : otherInfo.entrySet()) {
+        OtherInfoType type = each.getKey();
+        JSONObject json = each.getValue();
+        try {
+          // json conversion can be expensive, doing it separately
+          HiveProtoLoggingHook.EventLogger.addMapEntry(builder, type, json.toString());
+        } catch (Exception e) {
+          LOG.error("Unexpected exception while serializing json.", e);
+        }
+    }
+    return builder.mergeFrom(event).build();
+  }
+
+  private void addQueryObj(ExplainWork explainWork) {
+    try {
+      JSONObject queryObj = new JSONObject();
+      queryObj.put("queryText", queryStr);
+      queryObj.put("queryPlan", getExplainJSON(explainWork));
+      otherInfo.put(OtherInfoType.QUERY, queryObj);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while serializing json.", e);
+    }
+  }
+
+  private JSONObject getExplainJSON(ExplainWork explainWork) throws Exception {
+    ExplainTask explain = (ExplainTask) TaskFactory.get(explainWork, null);
+    return explain.getJSONPlan(null, explainWork, stageIdRearrange);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
index 86a6800..651121b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java
@@ -88,6 +88,7 @@ import java.net.UnknownHostException;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -105,8 +106,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.ExplainTask;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -252,7 +251,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
         LOG.debug("Not logging events of operation type : {}", plan.getOperationName());
         return;
       }
-      HiveHookEventProto event;
+      HiveHookEventProtoPartialBuilder event;
       switch (hookContext.getHookType()) {
       case PRE_EXEC_HOOK:
         event = getPreHookEvent(hookContext);
@@ -309,7 +308,8 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
     }
 
     private static final int MAX_RETRIES = 2;
-    private void writeEvent(HiveHookEventProto event) {
+    private void writeEvent(HiveHookEventProtoPartialBuilder builder) {
+      HiveHookEventProto event = builder.build();
       for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) {
         try {
           if (eventPerFile) {
@@ -349,7 +349,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
     }
 
-    private HiveHookEventProto getPreHookEvent(HookContext hookContext) {
+    private HiveHookEventProtoPartialBuilder getPreHookEvent(HookContext hookContext) {
       QueryPlan plan = hookContext.getQueryPlan();
       LOG.info("Received pre-hook notification for: " + plan.getQueryId());
 
@@ -359,6 +359,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       List<TezTask> tezTasks = Utilities.getTezTasks(plan.getRootTasks());
       ExecutionMode executionMode = getExecutionMode(plan, mrTasks, tezTasks);
 
+      Map<OtherInfoType, JSONObject> otherInfo = new HashMap<>();
       HiveHookEventProto.Builder builder = HiveHookEventProto.newBuilder();
       builder.setEventType(EventType.QUERY_SUBMITTED.name());
       builder.setTimestamp(plan.getQueryStartTime());
@@ -376,15 +377,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
         builder.setOperationId(hookContext.getOperationId());
       }
 
-      try {
-        JSONObject queryObj = new JSONObject();
-        queryObj.put("queryText", plan.getQueryStr());
-        queryObj.put("queryPlan", getExplainPlan(plan, conf, hookContext));
-        addMapEntry(builder, OtherInfoType.QUERY, queryObj.toString());
-      } catch (Exception e) {
-        LOG.error("Unexpected exception while serializing json.", e);
-      }
-
       addMapEntry(builder, OtherInfoType.TEZ, Boolean.toString(tezTasks.size() > 0));
       addMapEntry(builder, OtherInfoType.MAPRED, Boolean.toString(mrTasks.size() > 0));
       addMapEntry(builder, OtherInfoType.SESSION_ID, hookContext.getSessionId());
@@ -417,14 +409,28 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       for (Map.Entry<String, String> setting : conf) {
         confObj.put(setting.getKey(), setting.getValue());
       }
-      addMapEntry(builder, OtherInfoType.CONF, confObj.toString());
-      return builder.build();
+      otherInfo.put(OtherInfoType.CONF, confObj);
+
+      ExplainConfiguration explainConfig = new ExplainConfiguration();
+      explainConfig.setFormatted(true);
+      ExplainWork explainWork = new ExplainWork(null, // resFile
+              null, // pCtx
+              plan.getRootTasks(), // RootTasks
+              plan.getFetchTask(), // FetchTask
+              null, null, // analyzer
+              explainConfig, // explainConfig
+              plan.getCboInfo(), // cboInfo,
+              plan.getOptimizedQueryString(),
+              plan.getOptimizedCBOPlan());
+      return new HiveHookEventProtoPartialBuilder(
+              builder, explainWork, otherInfo, plan.getQueryStr(), conf.getVar(ConfVars.HIVESTAGEIDREARRANGE));
     }
 
-    private HiveHookEventProto getPostHookEvent(HookContext hookContext, boolean success) {
+    private HiveHookEventProtoPartialBuilder getPostHookEvent(HookContext hookContext, boolean success) {
       QueryPlan plan = hookContext.getQueryPlan();
       LOG.info("Received post-hook notification for: " + plan.getQueryId());
 
+      Map<OtherInfoType, JSONObject> other = new HashMap<>();
       HiveHookEventProto.Builder builder = HiveHookEventProto.newBuilder();
       builder.setEventType(EventType.QUERY_COMPLETED.name());
       builder.setTimestamp(clock.getTime());
@@ -439,12 +445,11 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       for (String key : hookContext.getPerfLogger().getEndTimes().keySet()) {
         perfObj.put(key, hookContext.getPerfLogger().getDuration(key));
       }
-      addMapEntry(builder, OtherInfoType.PERF, perfObj.toString());
-
-      return builder.build();
+      other.put(OtherInfoType.PERF, perfObj);
+      return new HiveHookEventProtoPartialBuilder(builder, null, other, plan.getQueryStr(), null);
     }
 
-    private void addMapEntry(HiveHookEventProto.Builder builder, OtherInfoType key, String value) {
+    public static void addMapEntry(HiveHookEventProto.Builder builder, OtherInfoType key, String value) {
       if (value != null) {
         builder.addOtherInfo(
             MapFieldEntry.newBuilder().setKey(key.name()).setValue(value).build());
@@ -507,26 +512,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext {
       }
     }
 
-    private JSONObject getExplainPlan(QueryPlan plan, HiveConf conf, HookContext hookContext)
-        throws Exception {
-      // Get explain plan for the query.
-      ExplainConfiguration config = new ExplainConfiguration();
-      config.setFormatted(true);
-      ExplainWork work = new ExplainWork(null, // resFile
-          null, // pCtx
-          plan.getRootTasks(), // RootTasks
-          plan.getFetchTask(), // FetchTask
-          null, null, // analyzer
-          config, // explainConfig
-          plan.getCboInfo(), // cboInfo,
-          plan.getOptimizedQueryString(),
-          plan.getOptimizedCBOPlan()
-      );
-      ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf);
-      explain.initialize(hookContext.getQueryState(), plan, null, null);
-      return explain.getJSONPlan(null, work);
-    }
-
     private ApplicationId determineLlapId(HiveConf conf, ExecutionMode mode) {
       // Note: for now, LLAP is only supported in Tez tasks. Will never come to MR; others may
       // be added here, although this is only necessary to have extra debug information.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java
index 6c87475..02bbf6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
@@ -53,7 +52,7 @@ public class StageIDsRearranger implements PhysicalPlanResolver {
   }
 
   private static List<Task> getExplainOrder(PhysicalContext pctx) {
-    List<Task> tasks = getExplainOrder(pctx.getConf(), pctx.getRootTasks());
+    List<Task> tasks = getExplainOrder(pctx.getRootTasks(), pctx.getConf().getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE));
     if (pctx.getFetchTask() != null) {
       tasks.add(pctx.getFetchTask());
     }
@@ -76,12 +75,11 @@ public class StageIDsRearranger implements PhysicalPlanResolver {
     return sources;
   }
 
-  public static List<Task> getExplainOrder(HiveConf conf, List<Task<?>> tasks) {
+  public static List<Task> getExplainOrder(List<Task<?>> tasks, String stageIdRearrange) {
     for (Task<?> task : tasks) {
       task.setRootTask(true);
     }
-    String var = conf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE);
-    ArrangeType type = ArrangeType.valueOf(var.toUpperCase());
+    ArrangeType type = ArrangeType.valueOf(stageIdRearrange.toUpperCase());
     if (type == ArrangeType.EXECUTION) {
       return executionOrder(tasks);
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveHookEventProtoPartialBuilder.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveHookEventProtoPartialBuilder.java
new file mode 100644
index 0000000..c6ab407
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveHookEventProtoPartialBuilder.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents;
+import org.json.JSONObject;
+import org.junit.Test;
+
+public class TestHiveHookEventProtoPartialBuilder {
+  private static final String QUERY_1 = "query1";
+  private static final String HIVE = "hive";
+  private static final String LLAP = "llap";
+  private static final String TEZ = "tez";
+  private static final long TIMESTAMP = System.currentTimeMillis();
+
+  @Test
+  public void testEquality() {
+    JSONObject json = new JSONObject();
+    json.put("key1", "value1");
+    json.put("key2", "value2");
+    json.put("key3", "value3");
+    HiveHookEvents.HiveHookEventProto event1 = buildWithOtherInfo(json);
+    HiveHookEvents.HiveHookEventProto event2 = buildIn2Steps(json);
+    assertArrayEquals(event1.toByteArray(), event2.toByteArray());
+  }
+
+  private HiveHookEvents.HiveHookEventProto buildWithOtherInfo(JSONObject json) {
+    return HiveHookEvents.HiveHookEventProto
+            .newBuilder()
+            .setEventType(HiveProtoLoggingHook.EventType.QUERY_SUBMITTED.name())
+            .setTimestamp(TIMESTAMP)
+            .setHiveQueryId(QUERY_1)
+            .setUser(HIVE)
+            .setRequestUser(HIVE)
+            .setQueue(LLAP)
+            .setExecutionMode(TEZ)
+            .addAllOtherInfo(singletonList(HiveHookEvents.MapFieldEntry.newBuilder()
+                    .setKey(HiveProtoLoggingHook.OtherInfoType.CONF.name())
+                    .setValue(json.toString()).build()))
+            .build();
+  }
+
+  private HiveHookEvents.HiveHookEventProto buildIn2Steps(JSONObject json) {
+    HiveHookEvents.HiveHookEventProto.Builder builder = HiveHookEvents.HiveHookEventProto
+            .newBuilder()
+            .setEventType(HiveProtoLoggingHook.EventType.QUERY_SUBMITTED.name())
+            .setTimestamp(TIMESTAMP)
+            .setHiveQueryId(QUERY_1)
+            .setUser(HIVE)
+            .setRequestUser(HIVE)
+            .setQueue(LLAP)
+            .setExecutionMode(TEZ);
+    Map<HiveProtoLoggingHook.OtherInfoType, JSONObject> otherInfo = new HashMap<>();
+    otherInfo.put(HiveProtoLoggingHook.OtherInfoType.CONF, json);
+    return new HiveHookEventProtoPartialBuilder(builder, null, otherInfo, null, null).build();
+  }
+}
\ No newline at end of file
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
index add4b68..71de23b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java
@@ -84,6 +84,7 @@ public class TestHiveProtoLoggingHook {
     QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {};
     queryPlan.setQueryId("test_queryId");
     queryPlan.setQueryStartTime(1234L);
+    queryPlan.setQueryString("SELECT * FROM t WHERE i > 10");
     queryPlan.setRootTasks(new ArrayList<>());
     queryPlan.setInputs(new HashSet<>());
     queryPlan.setOutputs(new HashSet<>());