You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/07/13 09:08:02 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3835: [CARBONDATA-3852] [CARBONDATA-3851] Fix Data update issue with SCD/CDC for partition table On spark cluster deploy mode

akashrn5 commented on a change in pull request #3835:
URL: https://github.com/apache/carbondata/pull/3835#discussion_r453499751



##########
File path: processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
##########
@@ -318,6 +335,13 @@ public void initializeWriter() throws CarbonDataWriterException {
     try {
       FileFactory.mkdirs(model.getCarbonDataDirectoryPath());
       if (enableDirectlyWriteDataToStorePath) {
+        if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+            .getCarbonDataDirectoryPath().endsWith("tmp")) {
+          // set carbonData file store path to partition path instead of tmp directory
+          carbonDataFileStorePath = model.getCarbonDataDirectoryPath()
+              .substring(0, model.getCarbonDataDirectoryPath().lastIndexOf("/")) + File.separator

Review comment:
       use `CarbonCommonConstans.FILE_SEPARATOR`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -283,6 +284,13 @@ case class CarbonMergeDataSetCommand(
           val queue = new util.LinkedList[InternalRow]()
           override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext) true else {
             writer.close()
+            // revert load direct write to store path after IUD

Review comment:
       update the command as its not IUD exactly, its just an insert operation

##########
File path: processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
##########
@@ -281,12 +281,29 @@ protected void commitCurrentFile(boolean copyInCurrentThread) {
         LOGGER.error(e);
       }
     } else {
-      if (currentFileSize == 0) {
-        try {
+      try {
+        if (currentFileSize == 0) {
           handleEmptyDataFile(carbonDataFileStorePath);
-        } catch (IOException e) {
-          LOGGER.error(e);
+        } else {
+          // In case if direct write data to store path is enabled, carbonData files will be written
+          // directly to partition path. Need to add partition path and carbon data file size info
+          // to load metrics
+          if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+              .getCarbonDataDirectoryPath().endsWith("tmp") && carbonDataFileStorePath

Review comment:
       check for ends with ` .tmp` similar to carbonUtil APIs

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -283,6 +284,13 @@ case class CarbonMergeDataSetCommand(
           val queue = new util.LinkedList[InternalRow]()
           override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext) true else {
             writer.close()
+            // revert load direct write to store path after IUD
+            if (null == directlyWriteDataToHdfs) {

Review comment:
       no need of null check, use .getProperty(property_value, default value) , so u can check null check

##########
File path: processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
##########
@@ -440,9 +464,23 @@ protected void writeIndexFile() throws IOException, CarbonDataWriterException {
           .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
               fileSizeInBytes, metrics);
       FileFactory.deleteFile(indexFileName);
+    } else if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+        .getCarbonDataDirectoryPath().endsWith("tmp")) {

Review comment:
       same as above

##########
File path: processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
##########
@@ -318,6 +335,13 @@ public void initializeWriter() throws CarbonDataWriterException {
     try {
       FileFactory.mkdirs(model.getCarbonDataDirectoryPath());
       if (enableDirectlyWriteDataToStorePath) {
+        if (model.getTableSpec().getCarbonTable().isHivePartitionTable() && model
+            .getCarbonDataDirectoryPath().endsWith("tmp")) {

Review comment:
       same as above,` .tmp`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org