You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/13 17:59:39 UTC

[carbondata] branch master updated: [CARBONDATA-3852] [CARBONDATA-3851] Fix Data update issue with SCD/CDC for partition table On spark cluster deploy mode

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ea253a1  [CARBONDATA-3852] [CARBONDATA-3851] Fix Data update issue with SCD/CDC for partition table On spark cluster deploy mode
ea253a1 is described below

commit ea253a11836840f207d0795e8ccaada4e1fd10db
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Fri Jul 10 22:58:02 2020 +0530

    [CARBONDATA-3852] [CARBONDATA-3851] Fix Data update issue with SCD/CDC for partition table On spark
    cluster deploy mode
    
    Why is this PR needed?
    Issue 1:
    On process IUD with mergeDataSetCommand, we are setting direct write to store path carbon property
    as true in executor side and resetting the value to false on driver side. Because of this, Update
    operation is unsuccessfull, as Issue 2 will come into picture.
    
    Issue 2:
    Currently, For insert/load data into partition table, data file will be moved to partition path
    and index files will remain in tmp file to improve commit job performance.
    When direct write to store path is enabled, data files are not moved to partition path and load
    metrics are not set and added to task context. On commitJob, it just return, as index and carbon
    size will be 0. Thus, the corresponding load is not successful.
    
    What changes were proposed in this PR?
    For Issue 1:
    Set direct write to store path carbon property to default value on close writer while processIUD
    for merge data set command
    
    For Issue 2:
    1. While writing data file, check if partition table and dir path is "partition tmp path", then
    write data file directly to partiiton path.
    2. On close writer, add partition carbon data and index info to metrics
    
    This closes #3835
---
 .../mutation/merge/CarbonMergeDataSetCommand.scala |  9 ++++-
 .../store/writer/AbstractFactDataWriter.java       | 46 ++++++++++++++++++++--
 2 files changed, 49 insertions(+), 6 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 4cee705..7abfc42 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -180,8 +180,6 @@ case class CarbonMergeDataSetCommand(
     } else {
       None
     }
-    CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
-      .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "false")
 
     CarbonInsertIntoWithDf(
       databaseNameOp = Some(carbonTable.getDatabaseName),
@@ -271,6 +269,10 @@ case class CarbonMergeDataSetCommand(
     val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
     (frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
       mapPartitionsWithIndex { case (index, iter) =>
+        var directlyWriteDataToHdfs = CarbonProperties.getInstance()
+          .getProperty(CarbonLoadOptionConstants
+            .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, CarbonLoadOptionConstants
+            .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
         CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
           .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")
         val confB = config.value.value
@@ -283,6 +285,9 @@ case class CarbonMergeDataSetCommand(
           val queue = new util.LinkedList[InternalRow]()
           override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext) true else {
             writer.close()
+            // revert load direct write to store path after insert
+            CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
+              .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, directlyWriteDataToHdfs)
             false
           }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 18911f4..637147c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -281,12 +281,29 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         LOGGER.error(e);
       }
     } else {
-      if (currentFileSize == 0) {
-        try {
+      try {
+        if (currentFileSize == 0) {
           handleEmptyDataFile(carbonDataFileStorePath);
-        } catch (IOException e) {
-          LOGGER.error(e);
+        } else {
+          // In case if direct write data to store path is enabled, carbonData files will be written
+          // directly to partition path. Need to add partition path and carbon data file size info
+          // to load metrics
+          if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+              .getCarbonDataDirectoryPath().endsWith(".tmp") && carbonDataFileStorePath
+              .endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+            if (metrics != null) {
+              // get partition directory path
+              String partitionPath = model.getCarbonDataDirectoryPath()
+                  .substring(0, model.getCarbonDataDirectoryPath().lastIndexOf(File.separator));
+              metrics.addToPartitionPath(partitionPath);
+              String carbonFilePath = partitionPath + carbonDataFileStorePath
+                  .substring(carbonDataFileStorePath.lastIndexOf(File.separator));
+              addOutputFilesInfoToMetrics(carbonFilePath);
+            }
+          }
         }
+      } catch (IOException e) {
+        LOGGER.error(e);
       }
     }
   }
@@ -318,6 +335,13 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     try {
       FileFactory.mkdirs(model.getCarbonDataDirectoryPath());
       if (enableDirectlyWriteDataToStorePath) {
+        if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+            .getCarbonDataDirectoryPath().endsWith(".tmp")) {
+          // set carbonData file store path to partition path instead of tmp directory
+          carbonDataFileStorePath = model.getCarbonDataDirectoryPath()
+              .substring(0, model.getCarbonDataDirectoryPath().lastIndexOf(File.separator))
+              + File.separator + carbonDataFileName;
+        }
         // the block size will be twice the block_size specified by user to make sure that
         // one carbondata file only consists exactly one HDFS block.
         fileOutputStream = FileFactory
@@ -440,9 +464,23 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
           .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
               fileSizeInBytes, metrics);
       FileFactory.deleteFile(indexFileName);
+    } else if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+        .getCarbonDataDirectoryPath().endsWith(".tmp")) {
+      if (metrics != null) {
+        addOutputFilesInfoToMetrics(indexFileName);
+      }
     }
   }
 
+  private void addOutputFilesInfoToMetrics(String carbonFilePath) {
+    // Store the number of files written by each task.
+    metrics.incrementCount();
+    // Store the files written by each task to metrics.
+    long targetSize = FileFactory.getCarbonFile(carbonFilePath).getSize();
+    metrics.addToOutputFiles(carbonFilePath + ":" + targetSize);
+    metrics.addOutputBytes(targetSize);
+  }
+
   /**
    * This method will close the executor service which is used for copying carbon
    * data files to carbon store path