You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/23 22:20:22 UTC
hive git commit: HIVE-20772 : record per-task CPU counters in LLAP
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 0d4d03fd1 -> 72349bb33
HIVE-20772 : record per-task CPU counters in LLAP (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72349bb3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72349bb3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72349bb3
Branch: refs/heads/master
Commit: 72349bb33988656ad43afdc0c7556532ee0dadbc
Parents: 0d4d03f
Author: sergey <se...@apache.org>
Authored: Tue Oct 23 15:16:17 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Tue Oct 23 15:16:17 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/llap/LlapUtil.java | 17 ++++++++
.../hive/llap/counters/LlapIOCounters.java | 4 +-
.../hive/llap/cache/LowLevelCacheCounters.java | 3 +-
.../llap/counters/QueryFragmentCounters.java | 10 ++++-
.../daemon/impl/StatsRecordingThreadPool.java | 42 +++++++++++++++----
.../hive/llap/io/api/impl/LlapRecordReader.java | 2 +-
.../llap/io/decode/EncodedDataConsumer.java | 43 +++++++++++++++++++-
.../llap/io/decode/OrcEncodedDataConsumer.java | 6 +--
.../llap/io/encoded/OrcEncodedDataReader.java | 10 ++---
.../llap/io/encoded/SerDeEncodedDataReader.java | 2 +-
10 files changed, 115 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 50c0e22..82776ab 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -14,6 +14,8 @@
package org.apache.hadoop.hive.llap;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -366,4 +368,19 @@ public class LlapUtil {
return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX)
|| p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX);
}
+
+
+ public static ThreadMXBean initThreadMxBean() {
+ ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+ if (mxBean != null) {
+ if (!mxBean.isCurrentThreadCpuTimeSupported()) {
+ LOG.warn("Thread CPU monitoring is not supported");
+ return null;
+ } else if (!mxBean.isThreadCpuTimeEnabled()) {
+ LOG.warn("Thread CPU monitoring is not enabled");
+ return null;
+ }
+ }
+ return mxBean;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
index 059d5b9..d27193f 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
@@ -36,7 +36,9 @@ public enum LlapIOCounters {
TOTAL_IO_TIME_NS(false),
DECODE_TIME_NS(false),
HDFS_TIME_NS(false),
- CONSUMER_TIME_NS(false);
+ CONSUMER_TIME_NS(false),
+ IO_CPU_NS(false),
+ IO_USER_NS(false);
// flag to indicate if these counters are subject to change across different test runs
private boolean testSafe;
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
index 91df036..c2aca5a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
@@ -21,6 +21,7 @@ public interface LowLevelCacheCounters {
void recordCacheHit(long bytesHit);
void recordCacheMiss(long bytesMissed);
void recordAllocBytes(long bytesWasted, long bytesAllocated);
- void recordHdfsTime(long timeUs);
+ void recordHdfsTime(long timeNs);
+ void recordThreadTimes(long cpuNs, long userNs);
long startTimeCounter();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index be4dfad..f5f2982 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -71,7 +71,7 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
return (doUseTimeCounters ? System.nanoTime() : 0);
}
- public void incrTimeCounter(LlapIOCounters counter, long startTime) {
+ public void incrWallClockCounter(LlapIOCounters counter, long startTime) {
if (!doUseTimeCounters) return;
long delta = System.nanoTime() - startTime;
fixedCounters.addAndGet(counter.ordinal(), delta);
@@ -109,7 +109,13 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
@Override
public void recordHdfsTime(long startTime) {
- incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+ incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+ }
+
+ @Override
+ public void recordThreadTimes(long cpuNs, long userNs) {
+ incrCounter(LlapIOCounters.IO_CPU_NS, cpuNs);
+ incrCounter(LlapIOCounters.IO_USER_NS, userNs);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
index 27462e1..873bdc7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hive.llap.daemon.impl;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Stack;
@@ -31,8 +34,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
-import org.apache.log4j.NDC;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -50,6 +53,7 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class);
// uncaught exception handler that will be set for all threads before execution
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+ private final ThreadMXBean mxBean;
public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize,
final long keepAliveTime,
@@ -66,11 +70,12 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
final ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.uncaughtExceptionHandler = handler;
+ this.mxBean = LlapUtil.initThreadMxBean();
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
- return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler));
+ return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler, mxBean));
}
public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) {
@@ -86,11 +91,13 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
private static class WrappedCallable<V> implements Callable<V> {
private Callable<V> actualCallable;
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+ private ThreadMXBean mxBean;
WrappedCallable(final Callable<V> callable,
- final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ final Thread.UncaughtExceptionHandler uncaughtExceptionHandler, ThreadMXBean mxBean) {
this.actualCallable = callable;
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+ this.mxBean = mxBean;
}
@Override
@@ -104,12 +111,18 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
// clone thread local file system statistics
List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics();
-
+ long cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime(),
+ userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();
setupMDCFromNDC(actualCallable);
try {
return actualCallable.call();
} finally {
- updateFileSystemCounters(statsBefore, actualCallable);
+ if (mxBean != null) {
+ cpuTime = mxBean.getCurrentThreadCpuTime() - cpuTime;
+ userTime = mxBean.getCurrentThreadUserTime() - userTime;
+ }
+ updateCounters(statsBefore, actualCallable, cpuTime, userTime);
+
MDC.clear();
}
}
@@ -148,8 +161,17 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
}
}
- private void updateFileSystemCounters(final List<LlapUtil.StatisticsData> statsBefore,
- final Callable<V> actualCallable) {
+ /**
+ * LLAP IO related counters.
+ */
+ public enum LlapExecutorCounters {
+ EXECUTOR_CPU_NS,
+ EXECUTOR_USER_NS;
+
+ }
+
+ private void updateCounters(final List<LlapUtil.StatisticsData> statsBefore,
+ final Callable<V> actualCallable, long cpuTime, long userTime) {
Thread thread = Thread.currentThread();
TezCounters tezCounters = null;
// add tez counters for task execution and llap io
@@ -160,9 +182,15 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
} else if (actualCallable instanceof TezCounterSource) {
// Other counter sources (currently used in LLAP IO).
tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
+ } else {
+ LOG.warn("Unexpected callable {}; cannot get counters", actualCallable);
}
if (tezCounters != null) {
+ if (cpuTime >= 0 && userTime >= 0) {
+ tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
+ tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
+ }
if (statsBefore != null) {
// if there are multiple stats for the same scheme (from different NameNode), this
// method will squash them together
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 27a5b0f..9ef7af4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -338,7 +338,7 @@ class LlapRecordReader
if (wasFirst) {
firstReturnTime = counters.startTimeCounter();
}
- counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
+ counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
return false;
}
if (isAcidScan) {
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index d5c2d48..f2d2832 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -17,19 +17,26 @@
*/
package org.apache.hadoop.hive.llap.io.decode;
+import java.lang.management.ThreadMXBean;
import java.util.concurrent.Callable;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.orc.TypeDescription;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.task.TaskRunner2Callable;
public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
implements Consumer<BatchType>, ReadPipeline {
@@ -41,11 +48,14 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
// Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema.
private static final int CVB_POOL_SIZE = 128;
protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;
+ protected final QueryFragmentCounters counters;
+ private final ThreadMXBean mxBean;
public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount,
- LlapDaemonIOMetrics ioMetrics) {
+ LlapDaemonIOMetrics ioMetrics, QueryFragmentCounters counters) {
this.downstreamConsumer = consumer;
this.ioMetrics = ioMetrics;
+ this.mxBean = LlapUtil.initThreadMxBean();
cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
new Pool.PoolObjectHelper<ColumnVectorBatch>() {
@Override
@@ -57,12 +67,41 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
// Don't reset anything, we are reusing column vectors.
}
});
+ this.counters = counters;
+ }
+
+ // Implementing TCS is needed for StatsRecordingThreadPool.
+ private class CpuRecordingCallable implements Callable<Void>, TezCounterSource {
+ private final Callable<Void> readCallable;
+
+ public CpuRecordingCallable(Callable<Void> readCallable) {
+ this.readCallable = readCallable;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ long cpuTime = mxBean.getCurrentThreadCpuTime(),
+ userTime = mxBean.getCurrentThreadUserTime();
+ try {
+ return readCallable.call();
+ } finally {
+ counters.recordThreadTimes(mxBean.getCurrentThreadCpuTime() - cpuTime,
+ mxBean.getCurrentThreadUserTime() - userTime);
+ }
+ }
+
+ @Override
+ public TezCounters getTezCounters() {
+ return (readCallable instanceof TezCounterSource)
+ ? ((TezCounterSource) readCallable).getTezCounters() : null;
+ }
+
}
public void init(ConsumerFeedback<BatchType> upstreamFeedback,
Callable<Void> readCallable) {
this.upstreamFeedback = upstreamFeedback;
- this.readCallable = readCallable;
+ this.readCallable = mxBean == null ? readCallable : new CpuRecordingCallable(readCallable);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 40248a3..83931c2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -69,7 +69,6 @@ public class OrcEncodedDataConsumer
private CompressionCodec codec;
private List<ConsumerStripeMetadata> stripes;
private final boolean skipCorrupt; // TODO: get rid of this
- private final QueryFragmentCounters counters;
private SchemaEvolution evolution;
private IoTrace trace;
private final Includes includes;
@@ -79,11 +78,10 @@ public class OrcEncodedDataConsumer
public OrcEncodedDataConsumer(
Consumer<ColumnVectorBatch> consumer, Includes includes, boolean skipCorrupt,
QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) {
- super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics);
+ super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics, counters);
this.includes = includes;
// TODO: get rid of this
this.skipCorrupt = skipCorrupt;
- this.counters = counters;
}
public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) {
@@ -209,7 +207,7 @@ public class OrcEncodedDataConsumer
counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize);
}
LlapIoImpl.ORC_LOGGER.debug("Done with decode");
- counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG);
counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 4f5b0a9..74cee64 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -443,7 +443,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
private void recordReaderTime(long startTime) {
- counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
}
private void validateFileMetadata() throws IOException {
@@ -519,7 +519,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
}
orcReader = EncodedOrcFile.createReader(path, opts);
- counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
}
/**
@@ -677,7 +677,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
DiskRangeList footerRange = rawDataReader.readFileData(
new DiskRangeList(offset, offset + si.getFooterLength()), 0, false);
// LOG.error("Got " + RecordReaderUtils.stringifyDiskRanges(footerRange));
- counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
if (hasCache) {
LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
@@ -716,7 +716,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
if (!isRawDataReaderOpen && isOpen) {
long startTime = counters.startTimeCounter();
rawDataReader.open();
- counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
}
return;
}
@@ -734,7 +734,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
rawDataReader.open();
isRawDataReaderOpen = true;
}
- counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 658bc7d..a5671e9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -1688,7 +1688,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private void recordReaderTime(long startTime) {
- counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+ counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
}
private boolean processStop() {