You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2019/08/08 11:54:46 UTC

[carbondata] branch master updated: [CARBONDATA-3487] wrong Input metrics (size/record) displayed in spark UI during insert into

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bbeb974  [CARBONDATA-3487] wrong Input metrics (size/record) displayed in spark UI during insert into
bbeb974 is described below

commit bbeb974e22d1b87670926d83dcac30c5ef139856
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Thu Aug 1 17:16:44 2019 +0530

    [CARBONDATA-3487] wrong Input metrics (size/record) displayed in spark UI during insert into
    
    Problem: wrong Input metrics (size/record) displayed in spark UI during insert into
    
    Cause:
    a) size metrics was wrong as the update didn't happen to spark input metrics because of below problem
    NewDataFrameRDD was setting thread local id, which was override by ScanRDD thread local id
    due to multi level RDD.
    b) Record metrics was not proper because, during insert into scanRDD will be called by multiple threads.
    Hence synchronisation was not there as one spark input metric (task level) is used by all the concurrent scanRDD.
    
    Solution:
    a) To fix the size, set threadlocal for parent RDD and don't set threadlocal for child RDD If it is already registered.
    b) To fix record, add synchronisation for record metrics (decrease the frequency by checking after interval)
    
    This closes #3345
---
 .../core/constants/CarbonCommonConstants.java      | 10 ++++++++
 .../carbondata/core/util/CarbonProperties.java     | 24 +++++++++++++++++
 .../carbondata/core/util/TaskMetricsMap.java       | 21 ++++++++++++++-
 .../apache/carbondata/spark/InitInputMetrics.java  |  2 +-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  2 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala    |  4 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  2 +-
 .../org/apache/spark/CarbonInputMetrics.scala      | 30 ++++++++++++++++------
 .../datamap/IndexDataMapRebuildRDD.scala           |  2 +-
 9 files changed, 83 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 510bcee..17b191d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1307,6 +1307,16 @@ public final class CarbonCommonConstants {
   public static final String IS_DRIVER_INSTANCE_DEFAULT = "false";
 
   /**
+   * property to set input metrics update interval (in records count), after every interval,
+   * input metrics will be updated to spark, else will be update in the end of query
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String INPUT_METRICS_UPDATE_INTERVAL = "carbon.input.metrics.update.interval";
+
+  public static final Long INPUT_METRICS_UPDATE_INTERVAL_DEFAULT = 500000L;
+
+
+  /**
    * property for enabling unsafe based query processing
    */
   @CarbonProperty(dynamicConfigurable = true)
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 5868664..c60dad8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1749,4 +1749,28 @@ public final class CarbonProperties {
     }
     return numOfThreadsForPruning;
   }
+
+  /**
+   * Validate and get the input metrics interval
+   *
+   * @return input metrics interval
+   */
+  public static Long getInputMetricsInterval() {
+    String metrics = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL);
+    if (metrics == null) {
+      return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+    } else {
+      try {
+        long configuredValue = Long.parseLong(metrics);
+        if (configuredValue < 0) {
+          return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+        } else {
+          return configuredValue;
+        }
+      } catch (Exception ex) {
+        return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+      }
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
index 196fd64..9d4e11c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
@@ -36,7 +36,7 @@ public class TaskMetricsMap {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(TaskMetricsMap.class.getName());
 
-  public static final InheritableThreadLocal<Long> threadLocal = new InheritableThreadLocal<>();
+  private static final InheritableThreadLocal<Long> threadLocal = new InheritableThreadLocal<>();
   /**
    * In this map we are maintaining all spawned child threads callback info for each parent thread
    * here key = parent thread id & values =  list of spawned child threads callbacks
@@ -50,6 +50,25 @@ public class TaskMetricsMap {
     return taskMetricsMap;
   }
 
+  public static InheritableThreadLocal<Long> getThreadLocal() {
+    return threadLocal;
+  }
+
+
+  /**
+   * initializes thread local to current thread id
+   *
+   * @return
+   */
+  public static void initializeThreadLocal() {
+    // In case of multi level RDD (say insert into scenario, where DataFrameRDD calling ScanRDD)
+    // parent thread id should not be overwritten by child thread id.
+    // so don't set if it is already set.
+    if (threadLocal.get() == null) {
+      threadLocal.set(Thread.currentThread().getId());
+    }
+  }
+
   /**
    * registers current thread callback using parent thread id
    *
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
index 8574a3a..6e4ab40 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
@@ -28,5 +28,5 @@ import org.apache.spark.TaskContext;
  */
 public interface InitInputMetrics extends InputMetricsStats {
 
-  void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit carbonMultiBlockSplit);
+  void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit carbonMultiBlockSplit, Long inputMetricsInterval);
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 81699b4..ea83827 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -364,7 +364,7 @@ object DataLoadProcessBuilderOnSpark {
     TaskContext.get.addTaskCompletionListener { _ =>
       CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
     }
-    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    TaskMetricsMap.initializeThreadLocal()
     val carbonTaskInfo = new CarbonTaskInfo
     carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index ce08f8f..bf3f862 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -51,6 +51,8 @@ abstract class CarbonRDD[T: ClassTag](
     info
   }
 
+  val inputMetricsInterval: Long = CarbonProperties.getInputMetricsInterval
+
   @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
 
   val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
@@ -73,7 +75,7 @@ abstract class CarbonRDD[T: ClassTag](
     TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
     carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    TaskMetricsMap.initializeThreadLocal()
     val carbonTaskInfo = new CarbonTaskInfo
     carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 973baa6..73aba46 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -422,7 +422,7 @@ class CarbonScanRDD[T: ClassTag](
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
     TaskMetricsMap.getInstance().registerThreadCallback()
-    inputMetricsStats.initBytesReadCallback(context, inputSplit)
+    inputMetricsStats.initBytesReadCallback(context, inputSplit, inputMetricsInterval)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
       val model = format.createQueryModel(inputSplit, attemptContext, filterExpression)
       // one query id per table
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 41fc013..69a9bfd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -18,10 +18,10 @@ package org.apache.spark
 
 import java.lang.Long
 
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.InputMetrics
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.TaskMetricsMap
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
 import org.apache.carbondata.spark.InitInputMetrics
@@ -35,20 +35,28 @@ class CarbonInputMetrics extends InitInputMetrics{
     var inputMetrics: InputMetrics = _
     // bytes read before compute by other map rdds in lineage
     var existingBytesRead: Long = _
+    var recordCount: Long = _
+    var inputMetricsInterval: Long = _
     var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
 
   def initBytesReadCallback(context: TaskContext,
-      carbonMultiBlockSplit: CarbonMultiBlockSplit) {
+      carbonMultiBlockSplit: CarbonMultiBlockSplit, inputMetricsInterval: Long) {
     inputMetrics = context.taskMetrics().inputMetrics
     existingBytesRead = inputMetrics.bytesRead
-    this.carbonMultiBlockSplit = carbonMultiBlockSplit;
+    recordCount = 0L
+    this.inputMetricsInterval = inputMetricsInterval
+    this.carbonMultiBlockSplit = carbonMultiBlockSplit
   }
 
   def incrementRecordRead(recordRead: Long) {
-    val value : scala.Long = recordRead
-    inputMetrics.incRecordsRead(value)
-    if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
-      updateBytesRead()
+    val value: scala.Long = recordRead
+    recordCount = recordCount + value
+    if (recordCount > inputMetricsInterval) {
+      inputMetrics.synchronized {
+        inputMetrics.incRecordsRead(recordCount)
+        updateBytesRead()
+      }
+      recordCount = 0L
     }
   }
 
@@ -59,10 +67,16 @@ class CarbonInputMetrics extends InitInputMetrics{
   }
 
   def updateAndClose() {
+    if (recordCount > 0L) {
+      inputMetrics.synchronized {
+        inputMetrics.incRecordsRead(recordCount)
+      }
+      recordCount = 0L
+    }
     // if metrics supported file system ex: hdfs
     if (!TaskMetricsMap.getInstance().isCallbackEmpty(Thread.currentThread().getId)) {
       updateBytesRead()
-     // after update clear parent thread entry from map.
+      // after update clear parent thread entry from map.
       TaskMetricsMap.getInstance().removeEntry(Thread.currentThread().getId)
     } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
       // If we can't get the bytes read from the FS stats, fall back to the split size,
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 31d1390..383100f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -338,7 +338,7 @@ class IndexDataMapRebuildRDD[K, V](
     val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo
     val segment = segments.find(p => p.getSegmentNo.equals(segmentId))
     if (segment.isDefined) {
-      inputMetrics.initBytesReadCallback(context, inputSplit)
+      inputMetrics.initBytesReadCallback(context, inputSplit, inputMetricsInterval)
 
       val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
       val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)