You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/05/11 05:24:55 UTC

[carbondata] branch master updated: [CARBONDATA-3812] Set output metrics for data load spark job

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8196dc2  [CARBONDATA-3812] Set output metrics for data load spark job
8196dc2 is described below

commit 8196dc26c7e0924fad54aba558df9dbea5000131
Author: QiangCai <qi...@qq.com>
AuthorDate: Sat May 9 11:52:24 2020 +0800

    [CARBONDATA-3812] Set output metrics for data load spark job
    
    Why is this PR needed?
    data load jobs are missing output metrics. please check detail in jira: CARBONDATA-3812
    
    What changes were proposed in this PR?
    re-factory OutputFilesInfoHolder to DataLoadMetrics
    add metrics: numOutputBytes and numOutputRows
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3757
---
 .../apache/carbondata/core/util/CarbonUtil.java    | 13 ++---
 ...utFilesInfoHolder.java => DataLoadMetrics.java} | 60 ++++++++++++++--------
 .../hadoop/api/CarbonOutputCommitter.java          |  2 +-
 .../hadoop/api/CarbonTableOutputFormat.java        | 20 ++++----
 .../carbondata/hive/MapredCarbonOutputFormat.java  |  2 +-
 .../carbondata/hive/util/HiveCarbonUtil.java       |  4 +-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  4 ++
 .../spark/rdd/NewCarbonDataLoadRDD.scala           |  5 ++
 .../spark/sql/events/MergeIndexEventListener.scala |  9 ++--
 .../CarbonAlterTableCompactionCommand.scala        |  4 +-
 .../datasources/SparkCarbonTableFormat.scala       |  5 +-
 .../org/apache/spark/sql/util/SparkSQLUtil.scala   |  9 ++++
 .../loading/CarbonDataLoadConfiguration.java       | 12 ++---
 .../processing/loading/DataLoadProcessBuilder.java |  2 +-
 .../processing/loading/model/CarbonLoadModel.java  | 16 +++---
 .../loading/model/CarbonLoadModelBuilder.java      |  3 ++
 .../CarbonRowDataWriterProcessorStepImpl.java      |  3 ++
 .../loading/steps/DataWriterProcessorStepImpl.java |  3 ++
 .../store/CarbonFactDataHandlerModel.java          | 17 +++---
 .../store/writer/AbstractFactDataWriter.java       | 12 ++---
 20 files changed, 125 insertions(+), 80 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ed33aa2..57bb093 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2628,25 +2628,26 @@ public final class CarbonUtil {
   public static void copyCarbonDataFileToCarbonStorePath(
       String localFilePath,
       String targetPath, long fileSizeInBytes,
-      OutputFilesInfoHolder outputFilesInfoHolder) throws CarbonDataWriterException {
+      DataLoadMetrics metrics) throws CarbonDataWriterException {
     if (targetPath.endsWith(".tmp") && localFilePath
         .endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
       // for partition case, write carbondata file directly to final path, keep index in temp path.
       // This can improve the commit job performance on s3a.
       targetPath =
           targetPath.substring(0, targetPath.lastIndexOf("/"));
-      if (outputFilesInfoHolder != null) {
-        outputFilesInfoHolder.addToPartitionPath(targetPath);
+      if (metrics != null) {
+        metrics.addToPartitionPath(targetPath);
       }
     }
     long targetSize = copyCarbonDataFileToCarbonStorePath(localFilePath, targetPath,
         fileSizeInBytes);
-    if (outputFilesInfoHolder != null) {
+    if (metrics != null) {
       // Storing the number of files written by each task.
-      outputFilesInfoHolder.incrementCount();
+      metrics.incrementCount();
       // Storing the files written by each task.
-      outputFilesInfoHolder.addToOutputFiles(targetPath + localFilePath
+      metrics.addToOutputFiles(targetPath + localFilePath
           .substring(localFilePath.lastIndexOf(File.separator)) + ":" + targetSize);
+      metrics.addOutputBytes(targetSize);
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java b/core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java
similarity index 73%
rename from core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java
rename to core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java
index 24d3ecd..273c9eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java
@@ -21,10 +21,10 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-public class OutputFilesInfoHolder implements Serializable {
-
-  private static final long serialVersionUID = -1401375818456585241L;
-
+/**
+ * store data loading metrics
+ */
+public class DataLoadMetrics implements Serializable {
   // stores the count of files written per task
   private int fileCount;
 
@@ -37,6 +37,38 @@ public class OutputFilesInfoHolder implements Serializable {
 
   private long mergeIndexSize;
 
+  private long numOutputBytes = 0L;
+
+  private long numOutputRows = 0L;
+
+  public synchronized int getFileCount() {
+    return fileCount;
+  }
+
+  public synchronized List<String> getOutputFiles() {
+    return outputFiles;
+  }
+
+  public synchronized List<String> getPartitionPath() {
+    return partitionPath;
+  }
+
+  public long getMergeIndexSize() {
+    return mergeIndexSize;
+  }
+
+  public void setMergeIndexSize(long mergeIndexSize) {
+    this.mergeIndexSize = mergeIndexSize;
+  }
+
+  public synchronized long getNumOutputBytes() {
+    return numOutputBytes;
+  }
+
+  public synchronized long getNumOutputRows() {
+    return numOutputRows;
+  }
+
   public synchronized void incrementCount() {
     // can call in multiple threads in single task
     fileCount++;
@@ -56,23 +88,11 @@ public class OutputFilesInfoHolder implements Serializable {
     partitionPath.add(path);
   }
 
-  public int getFileCount() {
-    return fileCount;
-  }
-
-  public List<String> getOutputFiles() {
-    return outputFiles;
+  public synchronized void addOutputBytes(long numOutputBytes) {
+    this.numOutputBytes += numOutputBytes;
   }
 
-  public List<String> getPartitionPath() {
-    return partitionPath;
-  }
-
-  public long getMergeIndexSize() {
-    return mergeIndexSize;
-  }
-
-  public void setMergeIndexSize(long mergeIndexSize) {
-    this.mergeIndexSize = mergeIndexSize;
+  public synchronized void addOutputRows(long numOutputRows) {
+    this.numOutputRows += numOutputRows;
   }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 33979e5..02c8d4c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -285,7 +285,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     String segmentFileName = SegmentFileStore.genSegmentFileName(
         loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
     newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
-    newMetaEntry.setIndexSize("" + loadModel.getOutputFilesInfoHolder().getMergeIndexSize());
+    newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize());
     if (!StringUtils.isEmpty(size)) {
       newMetaEntry.setDataSize(size);
     }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 200eb44..ebac3d4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -38,9 +38,9 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
@@ -242,7 +242,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
   public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
       final TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
-    loadModel.setOutputFilesInfoHolder(new OutputFilesInfoHolder());
+    loadModel.setMetrics(new DataLoadMetrics());
     String appName =
         taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
     if (null != appName) {
@@ -317,7 +317,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
     model.setTableName(CarbonTableOutputFormat.getTableName(conf));
     model.setCarbonTransactionalTable(true);
-    model.setOutputFilesInfoHolder(new OutputFilesInfoHolder());
+    model.setMetrics(new DataLoadMetrics());
     CarbonTable carbonTable = getCarbonTable(conf);
 
     // global dictionary is not supported since 2.0
@@ -482,17 +482,17 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
           // clean up the folders and files created locally for data load operation
           TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
         }
-        OutputFilesInfoHolder outputFilesInfoHolder = loadModel.getOutputFilesInfoHolder();
-        if (null != outputFilesInfoHolder) {
+        DataLoadMetrics metrics = loadModel.getMetrics();
+        if (null != metrics) {
           taskAttemptContext.getConfiguration()
-              .set("carbon.number.of.output.files", outputFilesInfoHolder.getFileCount() + "");
-          if (outputFilesInfoHolder.getOutputFiles() != null) {
+              .set("carbon.number.of.output.files", metrics.getFileCount() + "");
+          if (metrics.getOutputFiles() != null) {
             appendConfiguration(taskAttemptContext.getConfiguration(), "carbon.output.files.name",
-                outputFilesInfoHolder.getOutputFiles());
+                metrics.getOutputFiles());
           }
-          if (outputFilesInfoHolder.getPartitionPath() != null) {
+          if (metrics.getPartitionPath() != null) {
             appendConfiguration(taskAttemptContext.getConfiguration(),
-                "carbon.output.partitions.name", outputFilesInfoHolder.getPartitionPath());
+                "carbon.output.partitions.name", metrics.getPartitionPath());
           }
         }
         LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
index 3a8c7a7..a486fc0 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -102,7 +102,7 @@ public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
         partitionInfo != null ? partitionInfo.getColumnSchemaList().size() : 0;
     String finalOutputPath = FileFactory.getCarbonFile(finalOutPath.toString()).getAbsolutePath();
     if (carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable()) {
-      carbonLoadModel.getOutputFilesInfoHolder().addToPartitionPath(finalOutputPath);
+      carbonLoadModel.getMetrics().addToPartitionPath(finalOutputPath);
       context.getConfiguration().set("carbon.outputformat.writepath", finalOutputPath);
     }
     CarbonTableOutputFormat.setLoadModel(jc, carbonLoadModel);
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
index b611edb..29b2e4d 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
@@ -49,7 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -151,7 +151,7 @@ public class HiveCarbonUtil {
       throw new RuntimeException(e);
     }
     loadModel.setSkipParsers();
-    loadModel.setOutputFilesInfoHolder(new OutputFilesInfoHolder());
+    loadModel.setMetrics(new DataLoadMetrics());
     return loadModel;
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 8fb3002..55eee11 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -165,8 +165,10 @@ object DataLoadProcessBuilderOnSpark {
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
       setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
       val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+      loadModel.setMetrics(new DataLoadMetrics())
       DataLoadProcessorStepOnSpark.writeFunc(
         rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value)
+      SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, loadModel.getMetrics)
     })
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
@@ -252,8 +254,10 @@ object DataLoadProcessBuilderOnSpark {
     sc.runJob(newRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
       setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
       val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+      loadModel.setMetrics(new DataLoadMetrics())
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, loadModel,
         writeStepRowCounter, conf.value.value)
+      SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, loadModel.getMetrics)
     })
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 68f994c..521f105 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.{CollectionAccumulator, SparkUtil}
 
 import org.apache.carbondata.common.CarbonIterator
@@ -160,6 +161,7 @@ class NewCarbonDataLoadRDD[K, V](
         executor.execute(model,
           loader.storeLocation,
           recordReaders)
+        executor.close()
       } catch {
         case e: NoRetryException =>
           loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
@@ -174,6 +176,7 @@ class NewCarbonDataLoadRDD[K, V](
           LOGGER.error(e)
           throw e
       } finally {
+        SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, model.getMetrics)
         // clean up the folders and files created locally for data load operation
         TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
@@ -316,6 +319,7 @@ class NewDataFrameLoaderRDD[K, V](
             carbonLoadModel.getTableName,
             carbonLoadModel.getSegment.getSegmentNo))
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
+        executor.close()
       } catch {
         case e: NoRetryException =>
           loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
@@ -328,6 +332,7 @@ class NewDataFrameLoaderRDD[K, V](
           LOGGER.error(e)
           throw e
       } finally {
+        SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, model.getMetrics)
         // clean up the folders and files created locally for data load operation
         TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index d067448..14fa4ac 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,10 +29,9 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.MergeIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{ObjectSerializationUtil, OutputFilesInfoHolder}
+import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil}
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -86,9 +85,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               },
               currPartitionSpec = currPartitionSpecOption
             )
-            val outputFilesInfoHolder = new OutputFilesInfoHolder
-            loadModel.setOutputFilesInfoHolder(outputFilesInfoHolder)
-            loadModel.getOutputFilesInfoHolder.setMergeIndexSize(indexSize)
+            val metrics = new DataLoadMetrics
+            metrics.setMergeIndexSize(indexSize)
+            loadModel.setMetrics(metrics)
             LOGGER.info("Total time taken for merge index " +
                         (System.currentTimeMillis() - startTime))
             // clear Block dataMap Cache
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index aa1d294..2224943 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -42,8 +42,7 @@ import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataLoadMetrics}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.view.{MVSchema, MVStatus}
 import org.apache.carbondata.events._
@@ -177,6 +176,7 @@ case class CarbonAlterTableCompactionCommand(
         .getOrElse(CarbonCommonConstants.COMPRESSOR,
           CompressorFactory.getInstance().getCompressor.getName)
       carbonLoadModel.setColumnCompressor(columnCompressor)
+      carbonLoadModel.setMetrics(new DataLoadMetrics())
 
       var storeLocation = System.getProperty("java.io.tmpdir")
       storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index da46177..6ba1702 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types._
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
@@ -46,7 +45,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverter, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil, OutputFilesInfoHolder, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, DataLoadMetrics, DataTypeConverter, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
@@ -96,7 +95,7 @@ with Serializable {
       .getOrElse(CarbonCommonConstants.COMPRESSOR,
         CompressorFactory.getInstance().getCompressor.getName)
     model.setColumnCompressor(columnCompressor)
-    model.setOutputFilesInfoHolder(new OutputFilesInfoHolder())
+    model.setMetrics(new DataLoadMetrics())
 
     val carbonProperty = CarbonProperties.getInstance()
     val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 435cb58..8bf3483 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.executor.OutputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -35,6 +36,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.DataLoadMetrics
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -167,4 +169,11 @@ object SparkSQLUtil {
      */
     sparkSession.sqlContext.table(carbonTable.getTableName)
   }
+
+  def setOutputMetrics(outputMetrics: OutputMetrics, dataLoadMetrics: DataLoadMetrics): Unit = {
+    if (dataLoadMetrics != null && outputMetrics != null) {
+      outputMetrics.setBytesWritten(dataLoadMetrics.getNumOutputBytes)
+      outputMetrics.setRecordsWritten(dataLoadMetrics.getNumOutputRows)
+    }
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index f56be66..9d30331 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 
 public class CarbonDataLoadConfiguration {
 
@@ -98,7 +98,7 @@ public class CarbonDataLoadConfiguration {
 
   private int numberOfLoadingCores;
 
-  private OutputFilesInfoHolder outputFilesInfoHolder;
+  private DataLoadMetrics metrics;
 
   /**
    * Whether index columns are present. This flag should be set only when all the schema
@@ -378,12 +378,12 @@ public class CarbonDataLoadConfiguration {
     this.segmentPath = segmentPath;
   }
 
-  public OutputFilesInfoHolder getOutputFilesInfoHolder() {
-    return outputFilesInfoHolder;
+  public DataLoadMetrics getMetrics() {
+    return metrics;
   }
 
-  public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
-    this.outputFilesInfoHolder = outputFilesInfoHolder;
+  public void setMetrics(DataLoadMetrics metrics) {
+    this.metrics = metrics;
   }
 
   public boolean isIndexColumnsPresent() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index a412f9a..100f772 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -210,7 +210,7 @@ public final class DataLoadProcessBuilder {
     }
     configuration.setSkipParsers(loadModel.isSkipParsers());
     configuration.setTaskNo(loadModel.getTaskNo());
-    configuration.setOutputFilesInfoHolder(loadModel.getOutputFilesInfoHolder());
+    configuration.setMetrics(loadModel.getMetrics());
     String[] complexDelimiters = new String[loadModel.getComplexDelimiters().size()];
     loadModel.getComplexDelimiters().toArray(complexDelimiters);
     configuration
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 1aba14c..9d8d792 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public class CarbonLoadModel implements Serializable {
@@ -226,7 +226,7 @@ public class CarbonLoadModel implements Serializable {
    */
   private int bucketId;
 
-  private OutputFilesInfoHolder outputFilesInfoHolder;
+  private DataLoadMetrics metrics;
 
   private boolean skipParsers = false;
 
@@ -428,7 +428,7 @@ public class CarbonLoadModel implements Serializable {
     copy.rangePartitionColumn = rangePartitionColumn;
     copy.scaleFactor = scaleFactor;
     copy.totalSize = totalSize;
-    copy.outputFilesInfoHolder = outputFilesInfoHolder;
+    copy.metrics = metrics;
     copy.isLoadWithoutConverterWithoutReArrangeStep = isLoadWithoutConverterWithoutReArrangeStep;
     return copy;
   }
@@ -482,7 +482,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.rangePartitionColumn = rangePartitionColumn;
     copyObj.scaleFactor = scaleFactor;
     copyObj.totalSize = totalSize;
-    copyObj.outputFilesInfoHolder = outputFilesInfoHolder;
+    copyObj.metrics = metrics;
     copyObj.isLoadWithoutConverterStep = isLoadWithoutConverterStep;
     copyObj.isLoadWithoutConverterWithoutReArrangeStep = isLoadWithoutConverterWithoutReArrangeStep;
     return copyObj;
@@ -881,12 +881,12 @@ public class CarbonLoadModel implements Serializable {
     return scaleFactor;
   }
 
-  public OutputFilesInfoHolder getOutputFilesInfoHolder() {
-    return outputFilesInfoHolder;
+  public DataLoadMetrics getMetrics() {
+    return metrics;
   }
 
-  public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
-    this.outputFilesInfoHolder = outputFilesInfoHolder;
+  public void setMetrics(DataLoadMetrics metrics) {
+    this.metrics = metrics;
   }
 
   public boolean isIndexColumnsPresent() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 59e6345..33eefa5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
@@ -296,6 +297,8 @@ public class CarbonLoadModelBuilder {
     validateAndSetBinaryDecoder(carbonLoadModel);
 
     validateRangeColumn(optionsFinal, carbonLoadModel);
+
+    carbonLoadModel.setMetrics(new DataLoadMetrics());
   }
 
   private void validateRangeColumn(Map<String, String> optionsFinal,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index fbb6252..ebd3f7d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -418,6 +418,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
           carbonFactHandler.closeHandler();
         }
       }
+      if (configuration.getMetrics() != null) {
+        configuration.getMetrics().addOutputRows(rowCounter.get());
+      }
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 464959c..676c40e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -258,6 +258,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
           carbonFactHandler.closeHandler();
         }
       }
+      if (configuration.getMetrics() != null) {
+        configuration.getMetrics().addOutputRows(rowCounter.get());
+      }
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0ad37e8..fd54135 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.index.IndexWriterListener;
@@ -160,7 +160,7 @@ public class CarbonFactDataHandlerModel {
   // this will help in knowing complex byte array will be divided into how may new pages.
   private int noDictAllComplexColumnDepth;
 
-  private OutputFilesInfoHolder outputFilesInfoHolder;
+  private DataLoadMetrics metrics;
 
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
@@ -249,8 +249,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.indexWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
     carbonFactDataHandlerModel.initNumberOfCores();
-    carbonFactDataHandlerModel
-        .setOutputFilesInfoHolder(configuration.getOutputFilesInfoHolder());
+    carbonFactDataHandlerModel.setMetrics(configuration.getMetrics());
     return carbonFactDataHandlerModel;
   }
 
@@ -321,7 +320,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel
         .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable));
     carbonFactDataHandlerModel.sortScope = carbonTable.getSortScope();
-    carbonFactDataHandlerModel.setOutputFilesInfoHolder(loadModel.getOutputFilesInfoHolder());
+    carbonFactDataHandlerModel.setMetrics(loadModel.getMetrics());
     return carbonFactDataHandlerModel;
   }
 
@@ -657,12 +656,12 @@ public class CarbonFactDataHandlerModel {
     this.noDictAllComplexColumnDepth = noDictAllComplexColumnDepth;
   }
 
-  public OutputFilesInfoHolder getOutputFilesInfoHolder() {
-    return outputFilesInfoHolder;
+  public DataLoadMetrics getMetrics() {
+    return metrics;
   }
 
-  public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
-    this.outputFilesInfoHolder = outputFilesInfoHolder;
+  public void setMetrics(DataLoadMetrics metrics) {
+    this.metrics = metrics;
   }
 }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index fdc717f..0dd17ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
@@ -150,7 +150,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
   protected ExecutorService fallbackExecutorService;
 
-  private OutputFilesInfoHolder outputFilesInfoHolder;
+  private DataLoadMetrics metrics;
 
   public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
     this.model = model;
@@ -202,7 +202,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
           "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(),
               true));
     }
-    this.outputFilesInfoHolder = this.model.getOutputFilesInfoHolder();
+    this.metrics = this.model.getMetrics();
   }
 
   /**
@@ -270,7 +270,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         } else {
           if (copyInCurrentThread) {
             CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
-                model.getCarbonDataDirectoryPath(), fileSizeInBytes, outputFilesInfoHolder);
+                model.getCarbonDataDirectoryPath(), fileSizeInBytes, metrics);
             FileFactory.deleteFile(carbonDataFileTempPath);
           } else {
             executorServiceSubmitList
@@ -438,7 +438,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     if (!enableDirectlyWriteDataToStorePath) {
       CarbonUtil
           .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
-              fileSizeInBytes, outputFilesInfoHolder);
+              fileSizeInBytes, metrics);
       FileFactory.deleteFile(indexFileName);
     }
   }
@@ -504,7 +504,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     @Override
     public Void call() throws Exception {
       CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, model.getCarbonDataDirectoryPath(),
-          fileSizeInBytes, outputFilesInfoHolder);
+          fileSizeInBytes, metrics);
       FileFactory.deleteFile(fileName);
       return null;
     }