You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2022/07/02 06:34:26 UTC

[hive] branch master updated: HIVE-26182: Some improvements to make DPP more debuggable (#3257) (Laszlo Bodor reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 72ac4f506b HIVE-26182: Some improvements to make DPP more debuggable (#3257) (Laszlo Bodor reviewed by Rajesh Balamohan)
72ac4f506b is described below

commit 72ac4f506b0d06c5db9776777782af72b4ee9bb3
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Sat Jul 2 08:34:13 2022 +0200

    HIVE-26182: Some improvements to make DPP more debuggable (#3257) (Laszlo Bodor reviewed by Rajesh Balamohan)
---
 .../hive/ql/exec/tez/DynamicPartitionPruner.java   | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
index c04c3557a1..e75578cc27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
@@ -126,7 +126,7 @@ public class DynamicPartitionPruner {
       context.registerForVertexStateUpdates(source, states);
     }
 
-    LOG.info("Waiting for events (" + sourceInfoCount + " sources) ...");
+    LOG.info("Waiting for events ({} sources) ...", sourceInfoCount);
     // synchronous event processing loop. Won't return until all events have
     // been processed.
     this.processEvents();
@@ -210,7 +210,7 @@ public class DynamicPartitionPruner {
       String source = entry.getKey();
       for (SourceInfo si : entry.getValue()) {
         int taskNum = context.getVertexNumTasks(source);
-        LOG.info("Expecting " + taskNum + " events for vertex " + source + ", for column " + si.columnName);
+        LOG.info("Expecting {} events for vertex {}, for column {}", taskNum, source, si.columnName);
         expectedEvents += taskNum;
         ExprNodeDesc prunerExpr = prunePartitionSingleSource(jobConf, source, si);
         if (prunerExpr != null) {
@@ -235,7 +235,7 @@ public class DynamicPartitionPruner {
 
     // sanity check. all tasks must submit events for us to succeed.
     if (expectedEvents != totalEventCount) {
-      LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
+      LOG.error("Expecting: {} events, received: {}", expectedEvents, totalEventCount);
       throw new HiveException("Incorrect event count in dynamic partition pruning");
     }
   }
@@ -247,7 +247,7 @@ public class DynamicPartitionPruner {
     if (si.skipPruning.get()) {
       // in this case we've determined that there's too much data
       // to prune dynamically.
-      LOG.info("Skip pruning on " + source + ", column " + si.columnName);
+      LOG.info("Skip pruning on {}, column {}", source, si.columnName);
       return null;
     }
 
@@ -327,7 +327,7 @@ public class DynamicPartitionPruner {
       LOG.debug("part key expr applied: {}", partValue);
 
       if (!values.contains(partValue) && (!mustKeepOnePartition || work.getPathToPartitionInfo().size() > 1)) {
-        LOG.info("Pruning path: " + p);
+        LOG.info("Pruning path: {}", p);
         it.remove();
         // work.removePathToPartitionInfo(p);
         work.removePathToAlias(p);
@@ -390,7 +390,7 @@ public class DynamicPartitionPruner {
       this.deserializer = serDe;
 
       ObjectInspector inspector = deserializer.getObjectInspector();
-      LOG.debug("Type of obj insp: " + inspector.getTypeName());
+      LOG.debug("Type of obj insp: {}", inspector.getTypeName());
 
       soi = (StructObjectInspector) inspector;
       List<? extends StructField> fields = soi.getAllStructFieldRefs();
@@ -419,12 +419,13 @@ public class DynamicPartitionPruner {
         }
       } else {
         InputInitializerEvent event = (InputInitializerEvent) element;
+        ByteBuffer payload = event.getUserPayload();
         numEventsSeenPerSource.computeIfAbsent(event.getSourceVertexName(), vn -> new MutableInt(0))
             .increment();
 
         totalEventCount++;
-        LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
-            + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
+        LOG.info("Input event ({} -> {} {}), event payload size: {}", event.getSourceVertexName(),
+            event.getTargetVertexName(), event.getTargetInputName(), (payload.limit() - payload.position()));
         processPayload(event.getUserPayload(), event.getSourceVertexName());
         eventCount += 1;
         if (checkForSourceCompletion(event.getSourceVertexName())) {
@@ -432,10 +433,9 @@ public class DynamicPartitionPruner {
         }
       }
     }
-    LOG.info("Received events: " + eventCount);
+    LOG.info("Received events: {}", eventCount);
   }
 
-  @SuppressWarnings("deprecation")
   @VisibleForTesting
   protected String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
       IOException {
@@ -470,6 +470,7 @@ public class DynamicPartitionPruner {
         if (skip) {
           info.skipPruning.set(true);
         } else {
+          int partitionCount = 0;
           while (payload.hasRemaining()) {
             writable.readFields(in);
 
@@ -481,7 +482,9 @@ public class DynamicPartitionPruner {
             LOG.debug("Adding: {} to list of required partitions", value);
 
             info.values.add(value);
+            partitionCount++;
           }
+          LOG.info("Received {} partitions (source: {}, column: {})", partitionCount, sourceName, columnName);
         }
       }
     } finally {
@@ -536,7 +539,7 @@ public class DynamicPartitionPruner {
   }
 
   public void processVertex(String name) {
-    LOG.info("Vertex succeeded: " + name);
+    LOG.info("Vertex succeeded: {}", name);
     finishedVertices.add(name);
     queue.offer(VERTEX_FINISH_TOKEN);
   }
@@ -554,7 +557,7 @@ public class DynamicPartitionPruner {
       if (sourcesWaitingForEvents.isEmpty()) {
         return true;
       } else {
-        LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
+        LOG.info("Waiting for {} sources.", sourcesWaitingForEvents.size());
         return false;
       }
     } else if (processedEvents > expectedEvents) {