You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/10/23 22:20:22 UTC

hive git commit: HIVE-20772 : record per-task CPU counters in LLAP (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 0d4d03fd1 -> 72349bb33


HIVE-20772 : record per-task CPU counters in LLAP (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72349bb3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72349bb3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72349bb3

Branch: refs/heads/master
Commit: 72349bb33988656ad43afdc0c7556532ee0dadbc
Parents: 0d4d03f
Author: sergey <se...@apache.org>
Authored: Tue Oct 23 15:16:17 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Tue Oct 23 15:16:17 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/llap/LlapUtil.java   | 17 ++++++++
 .../hive/llap/counters/LlapIOCounters.java      |  4 +-
 .../hive/llap/cache/LowLevelCacheCounters.java  |  3 +-
 .../llap/counters/QueryFragmentCounters.java    | 10 ++++-
 .../daemon/impl/StatsRecordingThreadPool.java   | 42 +++++++++++++++----
 .../hive/llap/io/api/impl/LlapRecordReader.java |  2 +-
 .../llap/io/decode/EncodedDataConsumer.java     | 43 +++++++++++++++++++-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |  6 +--
 .../llap/io/encoded/OrcEncodedDataReader.java   | 10 ++---
 .../llap/io/encoded/SerDeEncodedDataReader.java |  2 +-
 10 files changed, 115 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 50c0e22..82776ab 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -14,6 +14,8 @@
 package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -366,4 +368,19 @@ public class LlapUtil {
     return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX)
         || p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX);
   }
+
+
+  public static ThreadMXBean initThreadMxBean() {
+    ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+    if (mxBean != null) {
+      if (!mxBean.isCurrentThreadCpuTimeSupported()) {
+        LOG.warn("Thread CPU monitoring is not supported");
+        return null;
+      } else if (!mxBean.isThreadCpuTimeEnabled()) {
+        LOG.warn("Thread CPU monitoring is not enabled");
+        return null;
+      }
+    }
+    return mxBean;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
index 059d5b9..d27193f 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
@@ -36,7 +36,9 @@ public enum LlapIOCounters {
   TOTAL_IO_TIME_NS(false),
   DECODE_TIME_NS(false),
   HDFS_TIME_NS(false),
-  CONSUMER_TIME_NS(false);
+  CONSUMER_TIME_NS(false),
+  IO_CPU_NS(false),
+  IO_USER_NS(false);
 
   // flag to indicate if these counters are subject to change across different test runs
   private boolean testSafe;

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
index 91df036..c2aca5a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
@@ -21,6 +21,7 @@ public interface LowLevelCacheCounters {
   void recordCacheHit(long bytesHit);
   void recordCacheMiss(long bytesMissed);
   void recordAllocBytes(long bytesWasted, long bytesAllocated);
-  void recordHdfsTime(long timeUs);
+  void recordHdfsTime(long timeNs);
+  void recordThreadTimes(long cpuNs, long userNs);
   long startTimeCounter();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index be4dfad..f5f2982 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -71,7 +71,7 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
     return (doUseTimeCounters ? System.nanoTime() : 0);
   }
 
-  public void incrTimeCounter(LlapIOCounters counter, long startTime) {
+  public void incrWallClockCounter(LlapIOCounters counter, long startTime) {
     if (!doUseTimeCounters) return;
     long delta = System.nanoTime() - startTime;
     fixedCounters.addAndGet(counter.ordinal(), delta);
@@ -109,7 +109,13 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
 
   @Override
   public void recordHdfsTime(long startTime) {
-    incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+    incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+  }
+
+  @Override
+  public void recordThreadTimes(long cpuNs, long userNs) {
+    incrCounter(LlapIOCounters.IO_CPU_NS, cpuNs);
+    incrCounter(LlapIOCounters.IO_USER_NS, userNs);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
index 27462e1..873bdc7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
@@ -31,8 +34,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
-import org.apache.log4j.NDC;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.counters.FileSystemCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -50,6 +53,7 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class);
   // uncaught exception handler that will be set for all threads before execution
   private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+  private final ThreadMXBean mxBean;
 
   public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize,
       final long keepAliveTime,
@@ -66,11 +70,12 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
       final ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) {
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
     this.uncaughtExceptionHandler = handler;
+    this.mxBean = LlapUtil.initThreadMxBean();
   }
 
   @Override
   protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
-    return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler));
+    return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler, mxBean));
   }
 
   public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) {
@@ -86,11 +91,13 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
   private static class WrappedCallable<V> implements Callable<V> {
     private Callable<V> actualCallable;
     private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+    private ThreadMXBean mxBean;
 
     WrappedCallable(final Callable<V> callable,
-        final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+        final Thread.UncaughtExceptionHandler uncaughtExceptionHandler, ThreadMXBean mxBean) {
       this.actualCallable = callable;
       this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+      this.mxBean = mxBean;
     }
 
     @Override
@@ -104,12 +111,18 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
 
       // clone thread local file system statistics
       List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics();
-
+      long cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime(),
+          userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();
       setupMDCFromNDC(actualCallable);
       try {
         return actualCallable.call();
       } finally {
-        updateFileSystemCounters(statsBefore, actualCallable);
+        if (mxBean != null) {
+          cpuTime = mxBean.getCurrentThreadCpuTime() - cpuTime;
+          userTime = mxBean.getCurrentThreadUserTime() - userTime;
+        }
+        updateCounters(statsBefore, actualCallable, cpuTime, userTime);
+
         MDC.clear();
       }
     }
@@ -148,8 +161,17 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
       }
     }
 
-    private void updateFileSystemCounters(final List<LlapUtil.StatisticsData> statsBefore,
-        final Callable<V> actualCallable) {
+    /**
+     * LLAP IO related counters.
+     */
+    public enum LlapExecutorCounters {
+      EXECUTOR_CPU_NS,
+      EXECUTOR_USER_NS;
+
+    }
+
+    private void updateCounters(final List<LlapUtil.StatisticsData> statsBefore,
+        final Callable<V> actualCallable, long cpuTime, long userTime) {
       Thread thread = Thread.currentThread();
       TezCounters tezCounters = null;
       // add tez counters for task execution and llap io
@@ -160,9 +182,15 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
       } else if (actualCallable instanceof TezCounterSource) {
         // Other counter sources (currently used in LLAP IO).
         tezCounters = ((TezCounterSource) actualCallable).getTezCounters();
+      } else {
+        LOG.warn("Unexpected callable {}; cannot get counters", actualCallable);
       }
 
       if (tezCounters != null) {
+        if (cpuTime >= 0 && userTime >= 0) {
+          tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
+          tezCounters.findCounter(LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
+        }
         if (statsBefore != null) {
           // if there are multiple stats for the same scheme (from different NameNode), this
           // method will squash them together

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 27a5b0f..9ef7af4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -338,7 +338,7 @@ class LlapRecordReader
       if (wasFirst) {
         firstReturnTime = counters.startTimeCounter();
       }
-      counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
+      counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
       return false;
     }
     if (isAcidScan) {

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index d5c2d48..f2d2832 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -17,19 +17,26 @@
  */
 package org.apache.hadoop.hive.llap.io.decode;
 
+import java.lang.management.ThreadMXBean;
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.orc.TypeDescription;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.task.TaskRunner2Callable;
 
 public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
   implements Consumer<BatchType>, ReadPipeline {
@@ -41,11 +48,14 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
   // Note that the pool is per EDC - within EDC, CVBs are expected to have the same schema.
   private static final int CVB_POOL_SIZE = 128;
   protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;
+  protected final QueryFragmentCounters counters;
+  private final ThreadMXBean mxBean;
 
   public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int colCount,
-      LlapDaemonIOMetrics ioMetrics) {
+      LlapDaemonIOMetrics ioMetrics, QueryFragmentCounters counters) {
     this.downstreamConsumer = consumer;
     this.ioMetrics = ioMetrics;
+    this.mxBean = LlapUtil.initThreadMxBean();
     cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
         new Pool.PoolObjectHelper<ColumnVectorBatch>() {
           @Override
@@ -57,12 +67,41 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
             // Don't reset anything, we are reusing column vectors.
           }
         });
+    this.counters = counters;
+  }
+
+  // Implementing TCS is needed for StatsRecordingThreadPool.
+  private class CpuRecordingCallable implements Callable<Void>, TezCounterSource {
+    private final Callable<Void> readCallable;
+
+    public CpuRecordingCallable(Callable<Void> readCallable) {
+      this.readCallable = readCallable;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      long cpuTime = mxBean.getCurrentThreadCpuTime(),
+          userTime = mxBean.getCurrentThreadUserTime();
+      try {
+        return readCallable.call();
+      } finally {
+        counters.recordThreadTimes(mxBean.getCurrentThreadCpuTime() - cpuTime,
+            mxBean.getCurrentThreadUserTime() - userTime);
+      }
+    }
+
+    @Override
+    public TezCounters getTezCounters() {
+      return (readCallable instanceof TezCounterSource)
+          ? ((TezCounterSource) readCallable).getTezCounters() : null;
+    }
+
   }
 
   public void init(ConsumerFeedback<BatchType> upstreamFeedback,
       Callable<Void> readCallable) {
     this.upstreamFeedback = upstreamFeedback;
-    this.readCallable = readCallable;
+    this.readCallable = mxBean == null ? readCallable : new CpuRecordingCallable(readCallable);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 40248a3..83931c2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -69,7 +69,6 @@ public class OrcEncodedDataConsumer
   private CompressionCodec codec;
   private List<ConsumerStripeMetadata> stripes;
   private final boolean skipCorrupt; // TODO: get rid of this
-  private final QueryFragmentCounters counters;
   private SchemaEvolution evolution;
   private IoTrace trace;
   private final Includes includes;
@@ -79,11 +78,10 @@ public class OrcEncodedDataConsumer
   public OrcEncodedDataConsumer(
     Consumer<ColumnVectorBatch> consumer, Includes includes, boolean skipCorrupt,
     QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) {
-    super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics);
+    super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics, counters);
     this.includes = includes;
     // TODO: get rid of this
     this.skipCorrupt = skipCorrupt;
-    this.counters = counters;
   }
 
   public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) {
@@ -209,7 +207,7 @@ public class OrcEncodedDataConsumer
         counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize);
       }
       LlapIoImpl.ORC_LOGGER.debug("Done with decode");
-      counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
+      counters.incrWallClockCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
       counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG);
       counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 4f5b0a9..74cee64 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -443,7 +443,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private void recordReaderTime(long startTime) {
-    counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+    counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
   }
 
   private void validateFileMetadata() throws IOException {
@@ -519,7 +519,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       }
     }
     orcReader = EncodedOrcFile.createReader(path, opts);
-    counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+    counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
   /**
@@ -677,7 +677,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     DiskRangeList footerRange = rawDataReader.readFileData(
         new DiskRangeList(offset, offset + si.getFooterLength()), 0, false);
     // LOG.error("Got " + RecordReaderUtils.stringifyDiskRanges(footerRange));
-    counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+    counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
     assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
     if (hasCache) {
       LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
@@ -716,7 +716,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       if (!isRawDataReaderOpen && isOpen) {
         long startTime = counters.startTimeCounter();
         rawDataReader.open();
-        counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+        counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
       }
       return;
     }
@@ -734,7 +734,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       rawDataReader.open();
       isRawDataReaderOpen = true;
     }
-    counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+    counters.incrWallClockCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/72349bb3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 658bc7d..a5671e9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -1688,7 +1688,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private void recordReaderTime(long startTime) {
-    counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+    counters.incrWallClockCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
   }
 
   private boolean processStop() {