You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/05/11 05:24:55 UTC
[carbondata] branch master updated: [CARBONDATA-3812] Set output
metrics for data load spark job
This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8196dc2 [CARBONDATA-3812] Set output metrics for data load spark job
8196dc2 is described below
commit 8196dc26c7e0924fad54aba558df9dbea5000131
Author: QiangCai <qi...@qq.com>
AuthorDate: Sat May 9 11:52:24 2020 +0800
[CARBONDATA-3812] Set output metrics for data load spark job
Why is this PR needed?
data load jobs are missing output metrics. please check detail in jira: CARBONDATA-3812
What changes were proposed in this PR?
re-factory OutputFilesInfoHolder to DataLoadMetrics
add metrics: numOutputBytes and numOutputRows
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3757
---
.../apache/carbondata/core/util/CarbonUtil.java | 13 ++---
...utFilesInfoHolder.java => DataLoadMetrics.java} | 60 ++++++++++++++--------
.../hadoop/api/CarbonOutputCommitter.java | 2 +-
.../hadoop/api/CarbonTableOutputFormat.java | 20 ++++----
.../carbondata/hive/MapredCarbonOutputFormat.java | 2 +-
.../carbondata/hive/util/HiveCarbonUtil.java | 4 +-
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 4 ++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 5 ++
.../spark/sql/events/MergeIndexEventListener.scala | 9 ++--
.../CarbonAlterTableCompactionCommand.scala | 4 +-
.../datasources/SparkCarbonTableFormat.scala | 5 +-
.../org/apache/spark/sql/util/SparkSQLUtil.scala | 9 ++++
.../loading/CarbonDataLoadConfiguration.java | 12 ++---
.../processing/loading/DataLoadProcessBuilder.java | 2 +-
.../processing/loading/model/CarbonLoadModel.java | 16 +++---
.../loading/model/CarbonLoadModelBuilder.java | 3 ++
.../CarbonRowDataWriterProcessorStepImpl.java | 3 ++
.../loading/steps/DataWriterProcessorStepImpl.java | 3 ++
.../store/CarbonFactDataHandlerModel.java | 17 +++---
.../store/writer/AbstractFactDataWriter.java | 12 ++---
20 files changed, 125 insertions(+), 80 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ed33aa2..57bb093 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2628,25 +2628,26 @@ public final class CarbonUtil {
public static void copyCarbonDataFileToCarbonStorePath(
String localFilePath,
String targetPath, long fileSizeInBytes,
- OutputFilesInfoHolder outputFilesInfoHolder) throws CarbonDataWriterException {
+ DataLoadMetrics metrics) throws CarbonDataWriterException {
if (targetPath.endsWith(".tmp") && localFilePath
.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
// for partition case, write carbondata file directly to final path, keep index in temp path.
// This can improve the commit job performance on s3a.
targetPath =
targetPath.substring(0, targetPath.lastIndexOf("/"));
- if (outputFilesInfoHolder != null) {
- outputFilesInfoHolder.addToPartitionPath(targetPath);
+ if (metrics != null) {
+ metrics.addToPartitionPath(targetPath);
}
}
long targetSize = copyCarbonDataFileToCarbonStorePath(localFilePath, targetPath,
fileSizeInBytes);
- if (outputFilesInfoHolder != null) {
+ if (metrics != null) {
// Storing the number of files written by each task.
- outputFilesInfoHolder.incrementCount();
+ metrics.incrementCount();
// Storing the files written by each task.
- outputFilesInfoHolder.addToOutputFiles(targetPath + localFilePath
+ metrics.addToOutputFiles(targetPath + localFilePath
.substring(localFilePath.lastIndexOf(File.separator)) + ":" + targetSize);
+ metrics.addOutputBytes(targetSize);
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java b/core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java
similarity index 73%
rename from core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java
rename to core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java
index 24d3ecd..273c9eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java
@@ -21,10 +21,10 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-public class OutputFilesInfoHolder implements Serializable {
-
- private static final long serialVersionUID = -1401375818456585241L;
-
+/**
+ * store data loading metrics
+ */
+public class DataLoadMetrics implements Serializable {
// stores the count of files written per task
private int fileCount;
@@ -37,6 +37,38 @@ public class OutputFilesInfoHolder implements Serializable {
private long mergeIndexSize;
+ private long numOutputBytes = 0L;
+
+ private long numOutputRows = 0L;
+
+ public synchronized int getFileCount() {
+ return fileCount;
+ }
+
+ public synchronized List<String> getOutputFiles() {
+ return outputFiles;
+ }
+
+ public synchronized List<String> getPartitionPath() {
+ return partitionPath;
+ }
+
+ public long getMergeIndexSize() {
+ return mergeIndexSize;
+ }
+
+ public void setMergeIndexSize(long mergeIndexSize) {
+ this.mergeIndexSize = mergeIndexSize;
+ }
+
+ public synchronized long getNumOutputBytes() {
+ return numOutputBytes;
+ }
+
+ public synchronized long getNumOutputRows() {
+ return numOutputRows;
+ }
+
public synchronized void incrementCount() {
// can call in multiple threads in single task
fileCount++;
@@ -56,23 +88,11 @@ public class OutputFilesInfoHolder implements Serializable {
partitionPath.add(path);
}
- public int getFileCount() {
- return fileCount;
- }
-
- public List<String> getOutputFiles() {
- return outputFiles;
+ public synchronized void addOutputBytes(long numOutputBytes) {
+ this.numOutputBytes += numOutputBytes;
}
- public List<String> getPartitionPath() {
- return partitionPath;
- }
-
- public long getMergeIndexSize() {
- return mergeIndexSize;
- }
-
- public void setMergeIndexSize(long mergeIndexSize) {
- this.mergeIndexSize = mergeIndexSize;
+ public synchronized void addOutputRows(long numOutputRows) {
+ this.numOutputRows += numOutputRows;
}
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 33979e5..02c8d4c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -285,7 +285,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
String segmentFileName = SegmentFileStore.genSegmentFileName(
loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
- newMetaEntry.setIndexSize("" + loadModel.getOutputFilesInfoHolder().getMergeIndexSize());
+ newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize());
if (!StringUtils.isEmpty(size)) {
newMetaEntry.setDataSize(size);
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 200eb44..ebac3d4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -38,9 +38,9 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
+import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
@@ -242,7 +242,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
final TaskAttemptContext taskAttemptContext) throws IOException {
final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
- loadModel.setOutputFilesInfoHolder(new OutputFilesInfoHolder());
+ loadModel.setMetrics(new DataLoadMetrics());
String appName =
taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
if (null != appName) {
@@ -317,7 +317,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
model.setTableName(CarbonTableOutputFormat.getTableName(conf));
model.setCarbonTransactionalTable(true);
- model.setOutputFilesInfoHolder(new OutputFilesInfoHolder());
+ model.setMetrics(new DataLoadMetrics());
CarbonTable carbonTable = getCarbonTable(conf);
// global dictionary is not supported since 2.0
@@ -482,17 +482,17 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
}
- OutputFilesInfoHolder outputFilesInfoHolder = loadModel.getOutputFilesInfoHolder();
- if (null != outputFilesInfoHolder) {
+ DataLoadMetrics metrics = loadModel.getMetrics();
+ if (null != metrics) {
taskAttemptContext.getConfiguration()
- .set("carbon.number.of.output.files", outputFilesInfoHolder.getFileCount() + "");
- if (outputFilesInfoHolder.getOutputFiles() != null) {
+ .set("carbon.number.of.output.files", metrics.getFileCount() + "");
+ if (metrics.getOutputFiles() != null) {
appendConfiguration(taskAttemptContext.getConfiguration(), "carbon.output.files.name",
- outputFilesInfoHolder.getOutputFiles());
+ metrics.getOutputFiles());
}
- if (outputFilesInfoHolder.getPartitionPath() != null) {
+ if (metrics.getPartitionPath() != null) {
appendConfiguration(taskAttemptContext.getConfiguration(),
- "carbon.output.partitions.name", outputFilesInfoHolder.getPartitionPath());
+ "carbon.output.partitions.name", metrics.getPartitionPath());
}
}
LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
index 3a8c7a7..a486fc0 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -102,7 +102,7 @@ public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
partitionInfo != null ? partitionInfo.getColumnSchemaList().size() : 0;
String finalOutputPath = FileFactory.getCarbonFile(finalOutPath.toString()).getAbsolutePath();
if (carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable()) {
- carbonLoadModel.getOutputFilesInfoHolder().addToPartitionPath(finalOutputPath);
+ carbonLoadModel.getMetrics().addToPartitionPath(finalOutputPath);
context.getConfiguration().set("carbon.outputformat.writepath", finalOutputPath);
}
CarbonTableOutputFormat.setLoadModel(jc, carbonLoadModel);
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
index b611edb..29b2e4d 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
@@ -49,7 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -151,7 +151,7 @@ public class HiveCarbonUtil {
throw new RuntimeException(e);
}
loadModel.setSkipParsers();
- loadModel.setOutputFilesInfoHolder(new OutputFilesInfoHolder());
+ loadModel.setMetrics(new DataLoadMetrics());
return loadModel;
}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 8fb3002..55eee11 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -165,8 +165,10 @@ object DataLoadProcessBuilderOnSpark {
sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+ loadModel.setMetrics(new DataLoadMetrics())
DataLoadProcessorStepOnSpark.writeFunc(
rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value)
+ SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, loadModel.getMetrics)
})
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
@@ -252,8 +254,10 @@ object DataLoadProcessBuilderOnSpark {
sc.runJob(newRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+ loadModel.setMetrics(new DataLoadMetrics())
DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, loadModel,
writeStepRowCounter, conf.value.value)
+ SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, loadModel.getMetrics)
})
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
// not have any functional impact as spark automatically monitors the cache usage on each node
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 68f994c..521f105 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.{CollectionAccumulator, SparkUtil}
import org.apache.carbondata.common.CarbonIterator
@@ -160,6 +161,7 @@ class NewCarbonDataLoadRDD[K, V](
executor.execute(model,
loader.storeLocation,
recordReaders)
+ executor.close()
} catch {
case e: NoRetryException =>
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
@@ -174,6 +176,7 @@ class NewCarbonDataLoadRDD[K, V](
LOGGER.error(e)
throw e
} finally {
+ SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, model.getMetrics)
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
@@ -316,6 +319,7 @@ class NewDataFrameLoaderRDD[K, V](
carbonLoadModel.getTableName,
carbonLoadModel.getSegment.getSegmentNo))
executor.execute(model, loader.storeLocation, recordReaders.toArray)
+ executor.close()
} catch {
case e: NoRetryException =>
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
@@ -328,6 +332,7 @@ class NewDataFrameLoaderRDD[K, V](
LOGGER.error(e)
throw e
} finally {
+ SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, model.getMetrics)
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index d067448..14fa4ac 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,10 +29,9 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.MergeIndexUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{ObjectSerializationUtil, OutputFilesInfoHolder}
+import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil}
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -86,9 +85,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
},
currPartitionSpec = currPartitionSpecOption
)
- val outputFilesInfoHolder = new OutputFilesInfoHolder
- loadModel.setOutputFilesInfoHolder(outputFilesInfoHolder)
- loadModel.getOutputFilesInfoHolder.setMergeIndexSize(indexSize)
+ val metrics = new DataLoadMetrics
+ metrics.setMergeIndexSize(indexSize)
+ loadModel.setMetrics(metrics)
LOGGER.info("Total time taken for merge index " +
(System.currentTimeMillis() - startTime))
// clear Block dataMap Cache
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index aa1d294..2224943 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -42,8 +42,7 @@ import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataLoadMetrics}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.view.{MVSchema, MVStatus}
import org.apache.carbondata.events._
@@ -177,6 +176,7 @@ case class CarbonAlterTableCompactionCommand(
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
+ carbonLoadModel.setMetrics(new DataLoadMetrics())
var storeLocation = System.getProperty("java.io.tmpdir")
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index da46177..6ba1702 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types._
import org.apache.spark.TaskContext
-import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.compression.CompressorFactory
@@ -46,7 +45,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverter, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil, OutputFilesInfoHolder, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, DataLoadMetrics, DataTypeConverter, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
@@ -96,7 +95,7 @@ with Serializable {
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
model.setColumnCompressor(columnCompressor)
- model.setOutputFilesInfoHolder(new OutputFilesInfoHolder())
+ model.setMetrics(new DataLoadMetrics())
val carbonProperty = CarbonProperties.getInstance()
val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 435cb58..8bf3483 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.util
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.executor.OutputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
@@ -35,6 +36,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.DataLoadMetrics
object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -167,4 +169,11 @@ object SparkSQLUtil {
*/
sparkSession.sqlContext.table(carbonTable.getTableName)
}
+
+ def setOutputMetrics(outputMetrics: OutputMetrics, dataLoadMetrics: DataLoadMetrics): Unit = {
+ if (dataLoadMetrics != null && outputMetrics != null) {
+ outputMetrics.setBytesWritten(dataLoadMetrics.getNumOutputBytes)
+ outputMetrics.setRecordsWritten(dataLoadMetrics.getNumOutputRows)
+ }
+ }
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index f56be66..9d30331 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
public class CarbonDataLoadConfiguration {
@@ -98,7 +98,7 @@ public class CarbonDataLoadConfiguration {
private int numberOfLoadingCores;
- private OutputFilesInfoHolder outputFilesInfoHolder;
+ private DataLoadMetrics metrics;
/**
* Whether index columns are present. This flag should be set only when all the schema
@@ -378,12 +378,12 @@ public class CarbonDataLoadConfiguration {
this.segmentPath = segmentPath;
}
- public OutputFilesInfoHolder getOutputFilesInfoHolder() {
- return outputFilesInfoHolder;
+ public DataLoadMetrics getMetrics() {
+ return metrics;
}
- public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
- this.outputFilesInfoHolder = outputFilesInfoHolder;
+ public void setMetrics(DataLoadMetrics metrics) {
+ this.metrics = metrics;
}
public boolean isIndexColumnsPresent() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index a412f9a..100f772 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -210,7 +210,7 @@ public final class DataLoadProcessBuilder {
}
configuration.setSkipParsers(loadModel.isSkipParsers());
configuration.setTaskNo(loadModel.getTaskNo());
- configuration.setOutputFilesInfoHolder(loadModel.getOutputFilesInfoHolder());
+ configuration.setMetrics(loadModel.getMetrics());
String[] complexDelimiters = new String[loadModel.getComplexDelimiters().size()];
loadModel.getComplexDelimiters().toArray(complexDelimiters);
configuration
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 1aba14c..9d8d792 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.path.CarbonTablePath;
public class CarbonLoadModel implements Serializable {
@@ -226,7 +226,7 @@ public class CarbonLoadModel implements Serializable {
*/
private int bucketId;
- private OutputFilesInfoHolder outputFilesInfoHolder;
+ private DataLoadMetrics metrics;
private boolean skipParsers = false;
@@ -428,7 +428,7 @@ public class CarbonLoadModel implements Serializable {
copy.rangePartitionColumn = rangePartitionColumn;
copy.scaleFactor = scaleFactor;
copy.totalSize = totalSize;
- copy.outputFilesInfoHolder = outputFilesInfoHolder;
+ copy.metrics = metrics;
copy.isLoadWithoutConverterWithoutReArrangeStep = isLoadWithoutConverterWithoutReArrangeStep;
return copy;
}
@@ -482,7 +482,7 @@ public class CarbonLoadModel implements Serializable {
copyObj.rangePartitionColumn = rangePartitionColumn;
copyObj.scaleFactor = scaleFactor;
copyObj.totalSize = totalSize;
- copyObj.outputFilesInfoHolder = outputFilesInfoHolder;
+ copyObj.metrics = metrics;
copyObj.isLoadWithoutConverterStep = isLoadWithoutConverterStep;
copyObj.isLoadWithoutConverterWithoutReArrangeStep = isLoadWithoutConverterWithoutReArrangeStep;
return copyObj;
@@ -881,12 +881,12 @@ public class CarbonLoadModel implements Serializable {
return scaleFactor;
}
- public OutputFilesInfoHolder getOutputFilesInfoHolder() {
- return outputFilesInfoHolder;
+ public DataLoadMetrics getMetrics() {
+ return metrics;
}
- public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
- this.outputFilesInfoHolder = outputFilesInfoHolder;
+ public void setMetrics(DataLoadMetrics metrics) {
+ this.metrics = metrics;
}
public boolean isIndexColumnsPresent() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 59e6345..33eefa5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
@@ -296,6 +297,8 @@ public class CarbonLoadModelBuilder {
validateAndSetBinaryDecoder(carbonLoadModel);
validateRangeColumn(optionsFinal, carbonLoadModel);
+
+ carbonLoadModel.setMetrics(new DataLoadMetrics());
}
private void validateRangeColumn(Map<String, String> optionsFinal,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index fbb6252..ebd3f7d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -418,6 +418,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
carbonFactHandler.closeHandler();
}
}
+ if (configuration.getMetrics() != null) {
+ configuration.getMetrics().addOutputRows(rowCounter.get());
+ }
}
}
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 464959c..676c40e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -258,6 +258,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
carbonFactHandler.closeHandler();
}
}
+ if (configuration.getMetrics() != null) {
+ configuration.getMetrics().addOutputRows(rowCounter.get());
+ }
}
}
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0ad37e8..fd54135 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.index.IndexWriterListener;
@@ -160,7 +160,7 @@ public class CarbonFactDataHandlerModel {
// this will help in knowing complex byte array will be divided into how may new pages.
private int noDictAllComplexColumnDepth;
- private OutputFilesInfoHolder outputFilesInfoHolder;
+ private DataLoadMetrics metrics;
/**
* Create the model using @{@link CarbonDataLoadConfiguration}
@@ -249,8 +249,7 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.indexWriterlistener = listener;
carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
carbonFactDataHandlerModel.initNumberOfCores();
- carbonFactDataHandlerModel
- .setOutputFilesInfoHolder(configuration.getOutputFilesInfoHolder());
+ carbonFactDataHandlerModel.setMetrics(configuration.getMetrics());
return carbonFactDataHandlerModel;
}
@@ -321,7 +320,7 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel
.setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable));
carbonFactDataHandlerModel.sortScope = carbonTable.getSortScope();
- carbonFactDataHandlerModel.setOutputFilesInfoHolder(loadModel.getOutputFilesInfoHolder());
+ carbonFactDataHandlerModel.setMetrics(loadModel.getMetrics());
return carbonFactDataHandlerModel;
}
@@ -657,12 +656,12 @@ public class CarbonFactDataHandlerModel {
this.noDictAllComplexColumnDepth = noDictAllComplexColumnDepth;
}
- public OutputFilesInfoHolder getOutputFilesInfoHolder() {
- return outputFilesInfoHolder;
+ public DataLoadMetrics getMetrics() {
+ return metrics;
}
- public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) {
- this.outputFilesInfoHolder = outputFilesInfoHolder;
+ public void setMetrics(DataLoadMetrics metrics) {
+ this.metrics = metrics;
}
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index fdc717f..0dd17ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.OutputFilesInfoHolder;
+import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
@@ -150,7 +150,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
protected ExecutorService fallbackExecutorService;
- private OutputFilesInfoHolder outputFilesInfoHolder;
+ private DataLoadMetrics metrics;
public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
this.model = model;
@@ -202,7 +202,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
"FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(),
true));
}
- this.outputFilesInfoHolder = this.model.getOutputFilesInfoHolder();
+ this.metrics = this.model.getMetrics();
}
/**
@@ -270,7 +270,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
} else {
if (copyInCurrentThread) {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
- model.getCarbonDataDirectoryPath(), fileSizeInBytes, outputFilesInfoHolder);
+ model.getCarbonDataDirectoryPath(), fileSizeInBytes, metrics);
FileFactory.deleteFile(carbonDataFileTempPath);
} else {
executorServiceSubmitList
@@ -438,7 +438,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
if (!enableDirectlyWriteDataToStorePath) {
CarbonUtil
.copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
- fileSizeInBytes, outputFilesInfoHolder);
+ fileSizeInBytes, metrics);
FileFactory.deleteFile(indexFileName);
}
}
@@ -504,7 +504,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
@Override
public Void call() throws Exception {
CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, model.getCarbonDataDirectoryPath(),
- fileSizeInBytes, outputFilesInfoHolder);
+ fileSizeInBytes, metrics);
FileFactory.deleteFile(fileName);
return null;
}