You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/22 09:05:26 UTC

carbondata git commit: [CARBONDATA-1169] Support input read bytes size / Record metrics in the UI

Repository: carbondata
Updated Branches:
  refs/heads/master c81c3b196 -> e3f98fa43


[CARBONDATA-1169] Support input read bytes size / Record metrics in the UI

Requirement : Support store input size / Record metrics for carbon UI

Solution : Adding input read bytes size / Records details in carbon UI
Example : Execute any query (select * query etc., )and check input size/ Records details in the UI

Input size metrics : We can use Hadoop FileSystem statistics but it is based on thread local variables, this is ok if the RDD computation chain is running on the same thread, but in carbon we are spawning multiple threads for computating Btree load, dictionary, read block etc.,. So we need to maintain one global map to track readbytes for all spawned threads & return total task readbytes, by summing all parent and spawned thread readbytes.

Record metrics: increment record count for each row

This closes #918


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e3f98fa4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e3f98fa4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e3f98fa4

Branch: refs/heads/master
Commit: e3f98fa4378e068af5ccdcbb7b2ce1fdb4684601
Parents: c81c3b1
Author: Manohar <ma...@gmail.com>
Authored: Wed May 17 12:29:04 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Tue Aug 22 14:35:05 2017 +0530

----------------------------------------------------------------------
 .../dictionary/ForwardDictionaryCache.java      |  47 +++--
 .../core/datastore/BlockIndexStore.java         |  12 +-
 .../processor/AbstractDataBlockIterator.java    |  19 +-
 .../carbondata/core/util/TaskMetricsMap.java    | 207 +++++++++++++++++++
 .../carbondata/hadoop/CarbonRecordReader.java   |  10 +
 .../carbondata/hadoop/InputMetricsStats.java    |  38 ++++
 .../carbondata/spark/InitInputMetrics.java      |  32 +++
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   3 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  25 ++-
 .../org/apache/spark/CarbonInputMetrics.scala   |  66 ++++++
 .../scala/org/apache/spark/sql/CarbonScan.scala |   7 +-
 .../VectorizedCarbonRecordReader.java           |  10 +-
 .../org/apache/spark/CarbonInputMetrics.scala   |  78 +++++++
 .../sql/CarbonDatasourceHadoopRelation.scala    |   7 +-
 14 files changed, 518 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
index b23bd49..6500bb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCache.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ObjectSizeCalculator;
+import org.apache.carbondata.core.util.TaskMetricsMap;
 
 /**
  * This class implements methods to create dictionary cache which will hold
@@ -100,28 +101,36 @@ public class ForwardDictionaryCache<K extends
     for (final DictionaryColumnUniqueIdentifier uniqueIdent : dictionaryColumnUniqueIdentifiers) {
       taskSubmitList.add(executorService.submit(new Callable<Dictionary>() {
         @Override public Dictionary call() throws IOException {
-          // in case of multiple task for same query same executor
-          // only one task should load the dictionary
-          // others will wait on monitor and get the loaded dictionary values
-          Object lockObject = DICTIONARY_LOCK_OBJECT.get(uniqueIdent);
-          // if lock object is null
-          if (null == lockObject) {
-            // Acquire the lock on map
-            synchronized (DICTIONARY_LOCK_OBJECT) {
-              // double checking the dictionary lock object
-              lockObject = DICTIONARY_LOCK_OBJECT.get(uniqueIdent);
-              // if still it is null add new lock object
-              if (null == lockObject) {
-                lockObject = new Object();
-                DICTIONARY_LOCK_OBJECT.put(uniqueIdent, lockObject);
+          try {
+            // Register thread callback for calculating metrics
+            TaskMetricsMap.getInstance().registerThreadCallback();
+            // in case of multiple task for same query same executor
+            // only one task should load the dictionary
+            // others will wait on monitor and get the loaded dictionary values
+            Object lockObject = DICTIONARY_LOCK_OBJECT.get(uniqueIdent);
+            // if lock object is null
+            if (null == lockObject) {
+              // Acquire the lock on map
+              synchronized (DICTIONARY_LOCK_OBJECT) {
+                // double checking the dictionary lock object
+                lockObject = DICTIONARY_LOCK_OBJECT.get(uniqueIdent);
+                // if still it is null add new lock object
+                if (null == lockObject) {
+                  lockObject = new Object();
+                  DICTIONARY_LOCK_OBJECT.put(uniqueIdent, lockObject);
+                }
               }
             }
+            Dictionary dictionary = null;
+            synchronized (lockObject) {
+              dictionary = getDictionary(uniqueIdent);
+            }
+            return dictionary;
+          }  finally {
+            // update read bytes metrics for this thread
+            TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId());
           }
-          Dictionary dictionary = null;
-          synchronized (lockObject) {
-            dictionary = getDictionary(uniqueIdent);
-          }
-          return dictionary;
+
         }
       }));
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
index fad6ed6..c9c5b3d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.TaskMetricsMap;
 
 /**
  * This class is used to load the B-Tree in Executor LRU Cache
@@ -290,8 +291,15 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
     }
 
     @Override public AbstractIndex call() throws Exception {
-      // load and return the loaded blocks
-      return get(tableBlockUniqueIdentifier);
+      try {
+        //register thread callback for calculating metrics
+        TaskMetricsMap.getInstance().registerThreadCallback();
+        // load and return the loaded blocks
+        return get(tableBlockUniqueIdentifier);
+      } finally {
+        // update read bytes metrics for this thread
+        TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index ff4f5dd..798d331 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.scan.scanner.BlockletScanner;
 import org.apache.carbondata.core.scan.scanner.impl.FilterScanner;
 import org.apache.carbondata.core.scan.scanner.impl.NonFilterScanner;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
 
 /**
  * This abstract class provides a skeletal implementation of the
@@ -198,14 +199,20 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
   private Future<BlocksChunkHolder> executeRead() {
     return executorService.submit(new Callable<BlocksChunkHolder>() {
       @Override public BlocksChunkHolder call() throws Exception {
-        if (dataBlockIterator.hasNext()) {
-          BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder();
-          if (blocksChunkHolder != null) {
-            blockletScanner.readBlocklet(blocksChunkHolder);
-            return blocksChunkHolder;
+        try {
+          TaskMetricsMap.getInstance().registerThreadCallback();
+          if (dataBlockIterator.hasNext()) {
+            BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder();
+            if (blocksChunkHolder != null) {
+              blockletScanner.readBlocklet(blocksChunkHolder);
+              return blocksChunkHolder;
+            }
           }
+          return null;
+        } finally {
+          // update read bytes metrics for this thread
+          TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId());
         }
-        return null;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
new file mode 100644
index 0000000..0dc6c97
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * This class maintains task level metrics info for all spawned child threads and parent task thread
+ */
+public class TaskMetricsMap {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TaskMetricsMap.class.getName());
+
+  public static final InheritableThreadLocal<Long> threadLocal = new InheritableThreadLocal<>();
+  /**
+   * In this map we are maintaining all spawned child threads callback info for each parent thread
+   * here key = parent thread id & values =  list of spawned child threads callbacks
+   */
+  public static Map<Long, List<CarbonFSBytesReadOnThreadCallback>> metricMap =
+      new ConcurrentHashMap<>();
+
+  public static TaskMetricsMap taskMetricsMap = new TaskMetricsMap();
+
+  public static TaskMetricsMap getInstance() {
+    return taskMetricsMap;
+  }
+
+  /**
+   * registers current thread callback using parent thread id
+   *
+   * @return
+   */
+  public void registerThreadCallback() {
+    // parent thread id should not be null as we are setting the same for all RDDs
+    if (null != threadLocal.get()) {
+      long parentThreadId = threadLocal.get();
+      new CarbonFSBytesReadOnThreadCallback(parentThreadId);
+    }
+  }
+
+  /**
+   * removes parent thread entry from map.
+   *
+   * @param threadId
+   */
+  public void removeEntry(long threadId) {
+    metricMap.remove(threadId);
+  }
+
+  /**
+   * returns all spawned child threads callback list of given parent thread
+   *
+   * @param threadId
+   * @return
+   */
+  public List<CarbonFSBytesReadOnThreadCallback> getCallbackList(long threadId) {
+    return metricMap.get(threadId);
+  }
+
+  public boolean isCallbackEmpty(long threadId) {
+    List<CarbonFSBytesReadOnThreadCallback> callbackList = getCallbackList(threadId);
+    if (null == callbackList) {
+      return true;
+    }
+    return callbackList.isEmpty();
+  }
+
+  /**
+   * This function updates read bytes of given thread
+   * After completing the task, each spawned child thread should update current read bytes,
+   * by calling this function.
+   *
+   * @param callbackThreadId
+   */
+  public void updateReadBytes(long callbackThreadId) {
+    // parent thread id should not be null as we are setting the same for all RDDs
+    if (null != threadLocal.get()) {
+      long parentThreadId = threadLocal.get();
+      List<CarbonFSBytesReadOnThreadCallback> callbackList = getCallbackList(parentThreadId);
+      if (null != callbackList) {
+        for (CarbonFSBytesReadOnThreadCallback callback : callbackList) {
+          if (callback.threadId == callbackThreadId) {
+            callback.updatedReadBytes += callback.readbytes();
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * returns total task read bytes, by summing all parent & spawned threads readbytes
+   *
+   * @param threadName
+   * @return
+   */
+  public long getReadBytesSum(long threadName) {
+    List<CarbonFSBytesReadOnThreadCallback> callbacks = getCallbackList(threadName);
+    long sum = 0;
+    if (null != callbacks) {
+      for (CarbonFSBytesReadOnThreadCallback callback : callbacks) {
+        sum += callback.getReadBytes();
+      }
+    }
+    return sum;
+  }
+
+  public void clear() {
+    metricMap.clear();
+  }
+
+  /**
+   * adds spawaned thread callback entry in metricmap using parentThreadId
+   *
+   * @param parentThreadId
+   * @param callback
+   */
+  private void addEntry(long parentThreadId, CarbonFSBytesReadOnThreadCallback callback) {
+    List<CarbonFSBytesReadOnThreadCallback> callbackList = getCallbackList(parentThreadId);
+    if (null == callbackList) {
+      //create new list
+      List<CarbonFSBytesReadOnThreadCallback> list = new CopyOnWriteArrayList<>();
+      list.add(callback);
+      metricMap.put(parentThreadId, list);
+    } else {
+      // add to existing list
+      callbackList.add(callback);
+    }
+  }
+
+  /**
+   * This class maintains getReadBytes info of each thread
+   */
+  class CarbonFSBytesReadOnThreadCallback {
+    long baseline = 0;
+    long updatedReadBytes = 0;
+    long threadId = Thread.currentThread().getId();
+
+    CarbonFSBytesReadOnThreadCallback(long parentThread) {
+      // reads current thread readBytes
+      this.baseline = readbytes();
+      addEntry(parentThread, this);
+    }
+
+    /**
+     * returns current thread readbytes from FileSystem Statistics
+     *
+     * @return
+     */
+    public long readbytes() {
+      List<FileSystem.Statistics> statisticsList = FileSystem.getAllStatistics();
+      long sum = 0;
+      try {
+        for (FileSystem.Statistics statistics : statisticsList) {
+          Class statisticsClass = Class.forName(statistics.getClass().getName());
+          Method getThreadStatisticsMethod =
+              statisticsClass.getDeclaredMethod("getThreadStatistics");
+          Class statisticsDataClass =
+              Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData");
+          Method getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead");
+          sum += (Long) getBytesReadMethod
+              .invoke(statisticsDataClass.cast(getThreadStatisticsMethod.invoke(statistics, null)),
+                  null);
+        }
+      } catch (Exception ex) {
+        LOGGER.debug(ex.getLocalizedMessage());
+      }
+      return sum;
+    }
+
+    /**
+     * After completing task, each child thread should update corresponding
+     * read bytes using updatedReadBytes method.
+     * if updatedReadBytes > 0 then return updatedReadBytes (i.e thread read bytes).
+     *
+     * @return
+     */
+    public long getReadBytes() {
+      return updatedReadBytes - baseline;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 5af4c30..bf25426 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -47,6 +47,13 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
   protected CarbonIterator<Object[]> carbonIterator;
 
   protected QueryExecutor queryExecutor;
+  private InputMetricsStats inputMetricsStats;
+
+  public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
+      InputMetricsStats inputMetricsStats) {
+    this(queryModel, readSupport);
+    this.inputMetricsStats = inputMetricsStats;
+  }
 
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
     this.queryModel = queryModel;
@@ -92,6 +99,9 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
 
   @Override public T getCurrentValue() throws IOException, InterruptedException {
     rowCount += 1;
+    if (null != inputMetricsStats) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
     return readSupport.readRow(carbonIterator.next());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
new file mode 100644
index 0000000..e678100
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop;
+
+import java.io.Serializable;
+import java.lang.Long;
+
+/**
+ * It gives statistics of number of bytes and record read
+ */
+public interface InputMetricsStats extends Serializable {
+
+  /**
+   * increment if record is read
+   */
+  void incrementRecordRead(Long recordRead);
+
+  /**
+   * update hdfs byte read
+   */
+  void updateAndClose();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
new file mode 100644
index 0000000..8574a3a
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark;
+
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+
+import org.apache.spark.TaskContext;
+
+
+/**
+ * Initializes bytes read call back
+ */
+public interface InitInputMetrics extends InputMetricsStats {
+
+  void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit carbonMultiBlockSplit);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index a35c896..a038ff3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext
 import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonTaskInfo, SessionParams, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonTaskInfo, SessionParams, TaskMetricsMap, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
 
 /**
  * This RDD maintains session level ThreadLocal
@@ -53,6 +53,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
     val carbonTaskInfo = new CarbonTaskInfo
     carbonTaskInfo.setTaskId(System.nanoTime)
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 4a3352d..2a1a781 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -37,9 +37,10 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, TaskMetricsMap}
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
@@ -54,7 +55,7 @@ class CarbonScanRDD(
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     serializedTableInfo: Array[Byte],
-    @transient tableInfo: TableInfo)
+    @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics)
   extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -187,19 +188,23 @@ class CarbonScanRDD(
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+    TaskMetricsMap.getInstance().registerThreadCallback()
+    inputMetricsStats.initBytesReadCallback(context, inputSplit)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
       val model = format.getQueryModel(inputSplit, attemptContext)
       val reader = {
         if (vectorReader) {
-          val carbonRecordReader = createVectorizedCarbonRecordReader(model)
+          val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
           if (carbonRecordReader == null) {
             new CarbonRecordReader(model,
-              format.getReadSupportClass(attemptContext.getConfiguration))
+              format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
           } else {
             carbonRecordReader
           }
         } else {
-          new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+          new CarbonRecordReader(model,
+            format.getReadSupportClass(attemptContext.getConfiguration),
+            inputMetricsStats)
         }
       }
 
@@ -213,6 +218,7 @@ class CarbonScanRDD(
         context.addTaskCompletionListener { context =>
           logStatistics(queryStartTime, model.getStatisticsRecorder)
           reader.close()
+        close()
         }
 
         override def hasNext: Boolean = {
@@ -234,6 +240,10 @@ class CarbonScanRDD(
           val value = reader.getCurrentValue
           value
         }
+        private def close() {
+          TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
+          inputMetricsStats.updateAndClose()
+      }
       }
     } else {
       new Iterator[Any] {
@@ -284,12 +294,13 @@ class CarbonScanRDD(
     firstOptionLocation
   }
 
-  def createVectorizedCarbonRecordReader(queryModel: QueryModel): RecordReader[Void, Object] = {
+  def createVectorizedCarbonRecordReader(queryModel: QueryModel,
+      inputMetricsStats: InputMetricsStats): RecordReader[Void, Object] = {
     val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
     try {
       val cons = Class.forName(name).getDeclaredConstructors
       cons.head.setAccessible(true)
-      cons.head.newInstance(queryModel).asInstanceOf[RecordReader[Void, Object]]
+      cons.head.newInstance(queryModel, inputMetricsStats).asInstanceOf[RecordReader[Void, Object]]
     } catch {
       case e: Exception =>
         LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
new file mode 100644
index 0000000..ea75ccb
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark
+
+import java.lang.Long
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, InputMetricsStats}
+import org.apache.carbondata.spark.InitInputMetrics
+
+/**
+ * It gives statistics of number of bytes and record read
+ */
+class CarbonInputMetrics extends InitInputMetrics {
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+  var inputMetrics: InputMetrics = _
+  var bytesReadCallback: Option[() => scala.Long] = _
+  var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
+
+  def initBytesReadCallback(context: TaskContext,
+                            carbonMultiBlockSplit: CarbonMultiBlockSplit) {
+    inputMetrics = context.taskMetrics().getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+    this.carbonMultiBlockSplit = carbonMultiBlockSplit;
+    bytesReadCallback = carbonMultiBlockSplit match {
+      case _: CarbonMultiBlockSplit =>
+        SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+      case _ => None
+    }
+  }
+
+  def incrementRecordRead(recordRead: Long) {
+    inputMetrics.incRecordsRead(recordRead)
+  }
+
+  def updateAndClose() {
+    if (bytesReadCallback.isDefined) {
+      inputMetrics.updateBytesRead()
+    } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
+      // If we can't get the bytes read from the FS stats, fall back to the split size,
+      // which may be inaccurate.
+      try {
+        inputMetrics.incBytesRead(carbonMultiBlockSplit.getLength)
+      } catch {
+        case e: java.io.IOException =>
+          LOGGER.warn("Unable to get input size to set InputMetrics for task:" + e.getMessage)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 3bd92e9..a3c6343 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -27,7 +28,7 @@ import org.apache.spark.sql.execution.LeafNode
 import org.apache.spark.sql.hive.CarbonMetastore
 
 import org.apache.carbondata.core.scan.model._
-import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.{CarbonProjection, InputMetricsStats}
 import org.apache.carbondata.spark.CarbonFilters
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
@@ -122,14 +123,14 @@ case class CarbonScan(
     columnProjection.foreach { attr =>
       projection.addColumn(attr.name)
     }
-
+    val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(
       ocRaw.sparkContext,
       projection,
       buildCarbonPlan.getFilterExpression,
       carbonTable.getAbsoluteTableIdentifier,
       carbonTable.getTableInfo.serialize(),
-      carbonTable.getTableInfo
+      carbonTable.getTableInfo, inputMetricsStats
     )
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 173c527..4914d8d 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.AbstractRecordReader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
 import org.apache.carbondata.spark.util.CarbonScalaUtil;
 
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -81,8 +82,11 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   private QueryExecutor queryExecutor;
 
-  public VectorizedCarbonRecordReader(QueryModel queryModel) {
+  private InputMetricsStats inputMetricsStats;
+
+  public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) {
     this.queryModel = queryModel;
+    this.inputMetricsStats = inputMetricsStats;
     enableReturningBatches();
   }
 
@@ -149,7 +153,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   @Override public Object getCurrentValue() throws IOException, InterruptedException {
     if (returnColumnarBatch) {
-      rowCount += columnarBatch.numValidRows();
+      int value = columnarBatch.numValidRows();
+      rowCount += value;
+      inputMetricsStats.incrementRecordRead(new Long(value));
       return columnarBatch;
     }
     rowCount += 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
new file mode 100644
index 0000000..b562ebc
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark
+
+import java.lang.Long
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.InputMetrics
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.TaskMetricsMap
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.spark.InitInputMetrics
+
+
+/**
+ * It gives statistics of number of bytes and record read
+ */
+class CarbonInputMetrics extends InitInputMetrics{
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    var inputMetrics: InputMetrics = _
+    // bytes read before compute by other map rdds in lineage
+    var existingBytesRead: Long = _
+    var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
+
+  def initBytesReadCallback(context: TaskContext,
+      carbonMultiBlockSplit: CarbonMultiBlockSplit) {
+    inputMetrics = context.taskMetrics().inputMetrics
+    existingBytesRead = inputMetrics.bytesRead
+    this.carbonMultiBlockSplit = carbonMultiBlockSplit;
+  }
+
+  def incrementRecordRead(recordRead: Long) {
+    val value : scala.Long = recordRead
+    inputMetrics.incRecordsRead(value)
+    if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+      updateBytesRead()
+    }
+  }
+
+  def updateBytesRead(): Unit = {
+    inputMetrics
+      .setBytesRead(existingBytesRead
+                    + TaskMetricsMap.getInstance().getReadBytesSum(Thread.currentThread().getId))
+  }
+
+  def updateAndClose() {
+    // if metrics supported file system ex: hdfs
+    if (!TaskMetricsMap.getInstance().isCallbackEmpty(Thread.currentThread().getId)) {
+      updateBytesRead()
+     // after update clear parent thread entry from map.
+      TaskMetricsMap.getInstance().removeEntry(Thread.currentThread().getId)
+    } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
+      // If we can't get the bytes read from the FS stats, fall back to the split size,
+      // which may be inaccurate.
+      try {
+        inputMetrics.incBytesRead(carbonMultiBlockSplit.getLength)
+      } catch {
+        case e: java.io.IOException =>
+          LOGGER.warn("Unable to get input size to set InputMetrics for task:" + e.getMessage)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e3f98fa4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d8a2978..303a8b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, DataOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.LoadTableByInsert
@@ -34,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.{CarbonProjection, InputMetricsStats}
 import org.apache.carbondata.spark.CarbonFilters
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
@@ -71,14 +72,14 @@ case class CarbonDatasourceHadoopRelation(
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
-
+    val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(
       sqlContext.sparkContext,
       projection,
       filterExpression.orNull,
       identifier,
       carbonTable.getTableInfo.serialize(),
-      carbonTable.getTableInfo)
+      carbonTable.getTableInfo, inputMetricsStats)
   }
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)