You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2022/07/02 06:34:26 UTC
[hive] branch master updated: HIVE-26182: Some improvements to make DPP more debuggable (#3257) (Laszlo Bodor reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 72ac4f506b HIVE-26182: Some improvements to make DPP more debuggable (#3257) (Laszlo Bodor reviewed by Rajesh Balamohan)
72ac4f506b is described below
commit 72ac4f506b0d06c5db9776777782af72b4ee9bb3
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Sat Jul 2 08:34:13 2022 +0200
HIVE-26182: Some improvements to make DPP more debuggable (#3257) (Laszlo Bodor reviewed by Rajesh Balamohan)
---
.../hive/ql/exec/tez/DynamicPartitionPruner.java | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
index c04c3557a1..e75578cc27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
@@ -126,7 +126,7 @@ public class DynamicPartitionPruner {
context.registerForVertexStateUpdates(source, states);
}
- LOG.info("Waiting for events (" + sourceInfoCount + " sources) ...");
+ LOG.info("Waiting for events ({} sources) ...", sourceInfoCount);
// synchronous event processing loop. Won't return until all events have
// been processed.
this.processEvents();
@@ -210,7 +210,7 @@ public class DynamicPartitionPruner {
String source = entry.getKey();
for (SourceInfo si : entry.getValue()) {
int taskNum = context.getVertexNumTasks(source);
- LOG.info("Expecting " + taskNum + " events for vertex " + source + ", for column " + si.columnName);
+ LOG.info("Expecting {} events for vertex {}, for column {}", taskNum, source, si.columnName);
expectedEvents += taskNum;
ExprNodeDesc prunerExpr = prunePartitionSingleSource(jobConf, source, si);
if (prunerExpr != null) {
@@ -235,7 +235,7 @@ public class DynamicPartitionPruner {
// sanity check. all tasks must submit events for us to succeed.
if (expectedEvents != totalEventCount) {
- LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
+ LOG.error("Expecting: {} events, received: {}", expectedEvents, totalEventCount);
throw new HiveException("Incorrect event count in dynamic partition pruning");
}
}
@@ -247,7 +247,7 @@ public class DynamicPartitionPruner {
if (si.skipPruning.get()) {
// in this case we've determined that there's too much data
// to prune dynamically.
- LOG.info("Skip pruning on " + source + ", column " + si.columnName);
+ LOG.info("Skip pruning on {}, column {}", source, si.columnName);
return null;
}
@@ -327,7 +327,7 @@ public class DynamicPartitionPruner {
LOG.debug("part key expr applied: {}", partValue);
if (!values.contains(partValue) && (!mustKeepOnePartition || work.getPathToPartitionInfo().size() > 1)) {
- LOG.info("Pruning path: " + p);
+ LOG.info("Pruning path: {}", p);
it.remove();
// work.removePathToPartitionInfo(p);
work.removePathToAlias(p);
@@ -390,7 +390,7 @@ public class DynamicPartitionPruner {
this.deserializer = serDe;
ObjectInspector inspector = deserializer.getObjectInspector();
- LOG.debug("Type of obj insp: " + inspector.getTypeName());
+ LOG.debug("Type of obj insp: {}", inspector.getTypeName());
soi = (StructObjectInspector) inspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
@@ -419,12 +419,13 @@ public class DynamicPartitionPruner {
}
} else {
InputInitializerEvent event = (InputInitializerEvent) element;
+ ByteBuffer payload = event.getUserPayload();
numEventsSeenPerSource.computeIfAbsent(event.getSourceVertexName(), vn -> new MutableInt(0))
.increment();
totalEventCount++;
- LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
- + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
+ LOG.info("Input event ({} -> {} {}), event payload size: {}", event.getSourceVertexName(),
+ event.getTargetVertexName(), event.getTargetInputName(), (payload.limit() - payload.position()));
processPayload(event.getUserPayload(), event.getSourceVertexName());
eventCount += 1;
if (checkForSourceCompletion(event.getSourceVertexName())) {
@@ -432,10 +433,9 @@ public class DynamicPartitionPruner {
}
}
}
- LOG.info("Received events: " + eventCount);
+ LOG.info("Received events: {}", eventCount);
}
- @SuppressWarnings("deprecation")
@VisibleForTesting
protected String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
IOException {
@@ -470,6 +470,7 @@ public class DynamicPartitionPruner {
if (skip) {
info.skipPruning.set(true);
} else {
+ int partitionCount = 0;
while (payload.hasRemaining()) {
writable.readFields(in);
@@ -481,7 +482,9 @@ public class DynamicPartitionPruner {
LOG.debug("Adding: {} to list of required partitions", value);
info.values.add(value);
+ partitionCount++;
}
+ LOG.info("Received {} partitions (source: {}, column: {})", partitionCount, sourceName, columnName);
}
}
} finally {
@@ -536,7 +539,7 @@ public class DynamicPartitionPruner {
}
public void processVertex(String name) {
- LOG.info("Vertex succeeded: " + name);
+ LOG.info("Vertex succeeded: {}", name);
finishedVertices.add(name);
queue.offer(VERTEX_FINISH_TOKEN);
}
@@ -554,7 +557,7 @@ public class DynamicPartitionPruner {
if (sourcesWaitingForEvents.isEmpty()) {
return true;
} else {
- LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
+ LOG.info("Waiting for {} sources.", sourcesWaitingForEvents.size());
return false;
}
} else if (processedEvents > expectedEvents) {