You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/26 21:29:14 UTC
[1/2] hive git commit: HIVE-19021 : WM counters are not properly
propagated from LLAP to AM (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master fc3dd4e52 -> 2b1f8082e
HIVE-19021 : WM counters are not properly propagated from LLAP to AM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9714f302
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9714f302
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9714f302
Branch: refs/heads/master
Commit: 9714f3023c44e4f0326827ab919b306ca59bf5f3
Parents: fc3dd4e
Author: sergey <se...@apache.org>
Authored: Mon Mar 26 14:15:06 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Mar 26 14:15:06 2018 -0700
----------------------------------------------------------------------
.../hive/llap/counters/WmFragmentCounters.java | 23 +++++++++-------
.../llap/daemon/impl/ContainerRunnerImpl.java | 4 +--
.../hive/llap/daemon/impl/LlapTaskReporter.java | 28 ++++++++++++++------
.../llap/daemon/impl/TaskRunnerCallable.java | 3 ++-
4 files changed, 38 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9714f302/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
index 8287adb..e4dfe4e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hive.llap.counters;
+
import java.util.concurrent.atomic.AtomicLongArray;
+
import org.apache.tez.common.counters.TezCounters;
/**
@@ -29,11 +31,10 @@ public class WmFragmentCounters {
private LlapWmCounters currentCounter = null;
private long currentCounterStartTime = 0;
private final AtomicLongArray fixedCounters;
- private final TezCounters tezCounters;
- public WmFragmentCounters(final TezCounters tezCounters) {
+ public WmFragmentCounters() {
+ // Note: WmFragmentCounters are created before Tez counters are created.
this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length);
- this.tezCounters = tezCounters;
}
public void changeStateQueued(boolean isGuaranteed) {
@@ -86,6 +87,8 @@ public class WmFragmentCounters {
private void changeState(State newState, LlapWmCounters counter) {
+ // Note: there are so many different onSuccess/onFailure callbacks floating around that
+ // this will probably be called twice for the done state. This is ok given the sync.
long newTime = System.nanoTime();
long oldTime = -1;
LlapWmCounters oldCounter = null;
@@ -107,8 +110,14 @@ public class WmFragmentCounters {
private void incrCounter(LlapWmCounters counter, long delta) {
fixedCounters.addAndGet(counter.ordinal(), delta);
- if (tezCounters != null) {
- tezCounters.findCounter(LlapWmCounters.values()[counter.ordinal()]).increment(delta);
+ }
+
+ public void dumpToTezCounters(TezCounters tezCounters, boolean isLast) {
+ if (isLast) {
+ changeStateDone(); // Record the final counters.
+ }
+ for (int i = 0; i < fixedCounters.length(); ++i) {
+ tezCounters.findCounter(LlapWmCounters.values()[i]).setValue(fixedCounters.get(i));
}
}
@@ -126,8 +135,4 @@ public class WmFragmentCounters {
sb.append(" ]");
return sb.toString();
}
-
- public TezCounters getTezCounters() {
- return tezCounters;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9714f302/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 8cd723d..ef5922e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -268,8 +268,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
Configuration callableConf = new Configuration(getConfig());
UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed();
- WmFragmentCounters wmCounters = new WmFragmentCounters(
- FragmentCountersMap.getCountersForFragment(fragmentId));
+ // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask.
+ WmFragmentCounters wmCounters = new WmFragmentCounters();
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
new ExecutionContextImpl(localAddress.get().getHostName()), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
http://git-wip-us.apache.org/repos/asf/hive/blob/9714f302/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index b05e0b9..33ade55 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.counters.WmFragmentCounters;
import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
@@ -92,10 +93,12 @@ public class LlapTaskReporter implements TaskReporterInterface {
@VisibleForTesting
HeartbeatCallable currentCallable;
+ private final WmFragmentCounters wmCounters;
+
public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter,
String containerIdStr, final String fragmentId, TezEvent initialEvent,
- String fragmentRequestId) {
+ String fragmentRequestId, WmFragmentCounters wmCounters) {
this.umbilical = umbilical;
this.pollInterval = amPollInterval;
this.sendCounterInterval = sendCounterInterval;
@@ -109,6 +112,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
this.completionListener = completionListener;
this.fragmentRequestId = fragmentRequestId;
+ this.wmCounters = wmCounters;
}
/**
@@ -120,8 +124,9 @@ public class LlapTaskReporter implements TaskReporterInterface {
TezCounters tezCounters = task.addAndGetTezCounter(fragmentId);
FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters);
LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName());
- currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval,
- maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId);
+ currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval,
+ sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr, initialEvent,
+ fragmentRequestId, wmCounters);
ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
Futures.addCallback(future, new HeartbeatCallback(errorReporter));
}
@@ -170,6 +175,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
+ private final WmFragmentCounters wmCounters;
/*
* Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
@@ -188,7 +194,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
RuntimeTask task, LlapTaskUmbilicalProtocol umbilical,
long amPollInterval, long sendCounterInterval, int maxEventsToGet,
AtomicLong requestCounter, String containerIdStr,
- TezEvent initialEvent, String fragmentRequestId) {
+ TezEvent initialEvent, String fragmentRequestId, WmFragmentCounters wmCounters) {
this.pollInterval = amPollInterval;
this.sendCounterInterval = sendCounterInterval;
@@ -198,6 +204,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
this.initialEvent = initialEvent;
this.completionListener = completionListener;
this.fragmentRequestId = fragmentRequestId;
+ this.wmCounters = wmCounters;
this.task = task;
this.umbilical = umbilical;
@@ -275,7 +282,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
sendCounters = true;
prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
}
- updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata);
+ updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters, false), updateEventMetadata);
events.add(updateEvent);
}
@@ -378,7 +385,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
// Ensure only one final event is ever sent.
if (!finalEventQueued.getAndSet(true)) {
- TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+ TezEvent statusUpdateEvent = new TezEvent(
+ getStatusUpdateEvent(true, true), updateEventMetadata);
TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
updateEventMetadata);
if (LOG.isDebugEnabled()) {
@@ -392,7 +400,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
}
}
- private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
+ private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters, boolean isLast) {
TezCounters counters = null;
TaskStatistics stats = null;
float progress = 0;
@@ -403,6 +411,9 @@ public class LlapTaskReporter implements TaskReporterInterface {
if (sendCounters) {
// send these potentially large objects at longer intervals to avoid overloading the AM
counters = task.getCounters();
+ if (wmCounters != null && counters != null) {
+ wmCounters.dumpToTezCounters(counters, isLast);
+ }
stats = task.getTaskStatistics();
}
}
@@ -444,7 +455,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
srcMeta == null ? updateEventMetadata : srcMeta));
}
try {
- tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata));
+ tezEvents.add(new TezEvent(
+ getStatusUpdateEvent(true, true), updateEventMetadata));
} catch (Exception e) {
// Counter may exceed limitation
LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out");
http://git-wip-us.apache.org/repos/asf/hive/blob/9714f302/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index b484a13..7f436e2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -269,7 +269,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
request.getContainerIdString(),
fragmentId,
initialEvent,
- requestId);
+ requestId,
+ wmCounters);
String attemptId = fragmentInfo.getFragmentIdentifierString();
IOContextMap.setThreadAttemptId(attemptId);
[2/2] hive git commit: HIVE-19003 : metastoreconf logs too much on
info level (Sergey Shelukhin, reviewed by Alan Gates)
Posted by se...@apache.org.
HIVE-19003 : metastoreconf logs too much on info level (Sergey Shelukhin, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b1f8082
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b1f8082
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b1f8082
Branch: refs/heads/master
Commit: 2b1f8082e90e17bb6b48d486c1deacfdb06daf99
Parents: 9714f30
Author: sergey <se...@apache.org>
Authored: Mon Mar 26 14:16:39 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Mar 26 14:16:39 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/metastore/conf/MetastoreConf.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1f8082/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9f82256..b8976ed 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1142,8 +1142,8 @@ public class MetastoreConf {
}
if (!beenDumped.getAndSet(true) && getBoolVar(conf, ConfVars.DUMP_CONFIG_ON_CREATION) &&
- LOG.isInfoEnabled()) {
- LOG.info(dumpConfig(conf));
+ LOG.isDebugEnabled()) {
+ LOG.debug(dumpConfig(conf));
}
return conf;
}