You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:14 UTC

[12/45] carbondata git commit: [HOTFIX] support "carbon.load.directWriteHdfs.enabled" for S3

[HOTFIX] support "carbon.load.directWriteHdfs.enabled" for S3

problem : Currently for s3, when the above carbon property is set. index file will not be written in the s3 store path due to bug in folder path.

Solution: file separator used is wrong. Need to fix it.
Also rename a carbon peroperty
"carbon.load.directWriteHdfs.enabled" to
"carbon.load.directWriteToStorePath.enabled"

This closes #2697


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5d17ff40
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5d17ff40
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5d17ff40

Branch: refs/heads/branch-1.5
Commit: 5d17ff40bdeeba64a8885fa2df427fbdec6a38ea
Parents: 2a4f530
Author: ajantha-bhat <aj...@gmail.com>
Authored: Thu Sep 6 16:47:22 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu Sep 27 14:03:37 2018 +0530

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  6 ++--
 docs/configuration-parameters.md                |  2 +-
 .../carbondata/examples/sdk/SDKS3Example.java   | 12 +++++++
 .../dataload/TestLoadDataGeneral.scala          |  8 ++---
 .../store/writer/AbstractFactDataWriter.java    | 38 ++++++++++----------
 5 files changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 3eab69d..82485ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -136,9 +136,9 @@ public final class CarbonLoadOptionConstants {
   public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 
   @CarbonProperty
-  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
-      = "carbon.load.directWriteHdfs.enabled";
-  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = "false";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH
+      = "carbon.load.directWriteToStorePath.enabled";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT = "false";
 
   /**
    * If the sort memory is insufficient, spill inmemory pages to disk.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 662525b..9dd8164 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -90,7 +90,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.prefetch.buffersize | 1000 | When the configuration ***carbon.merge.sort.prefetch*** is configured to true, we need to set the number of records that can be prefetched.This configuration is used specify the number of records to be prefetched.**NOTE: **Configuring more number of records to be prefetched increases memory footprint as more records will have to be kept in memory. |
 | load_min_size_inmb | 256 | This configuration is used along with ***carbon.load.min.size.enabled***.This determines the minimum size of input files to be considered for distribution among executors while data loading.**NOTE:** Refer to ***carbon.load.min.size.enabled*** for understanding when this configuration needs to be used and its advantages and disadvantages. |
 | carbon.load.sortmemory.spill.percentage | 0 | During data loading, some data pages are kept in memory upto memory configured in ***carbon.sort.storage.inmemory.size.inmb*** beyond which they are spilled to disk as intermediate temporary sort files.This configuration determines after what percentage data needs to be spilled to disk.**NOTE:** Without this configuration, when the data pages occupy upto configured memory, new data pages would be dumped to disk and old pages are still maintained in disk. |
-| carbon.load.directWriteHdfs.enabled | false | During data load all the carbondata files are written to local disk and finally copied to the target location in HDFS.Enabling this parameter will make carrbondata files to be written directly onto target HDFS location bypassing the local disk.**NOTE:** Writing directly to HDFS saves local disk IO(once for writing the files and again for copying to HDFS) there by improving the performance.But the drawback is when data loading fails or the application crashes, unwanted carbondata files will remain in the target HDFS location until it is cleared during next data load or by running *CLEAN FILES* DDL command |
+| carbon.load.directWriteToStorePath.enabled | false | During data load, all the carbondata files are written to local disk and finally copied to the target store location in HDFS/S3.Enabling this parameter will make carbondata files to be written directly onto target HDFS/S3 location bypassing the local disk.**NOTE:** Writing directly to HDFS/S3 saves local disk IO(once for writing the files and again for copying to HDFS/S3) there by improving the performance.But the drawback is when data loading fails or the application crashes, unwanted carbondata files will remain in the target HDFS/S3 location until it is cleared during next data load or by running *CLEAN FILES* DDL command |
 | carbon.options.serialization.null.format | \N | Based on the business scenarios, some columns might need to be loaded with null values.As null value cannot be written in csv files, some special characters might be adopted to specify null values.This configuration can be used to specify the null values format in the data being loaded. |
 | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits.When ***enable.unsafe.sort*** configuration is enabled, instead of using ***carbon.sort.size*** which is based on rows count, size occupied in memory is used to determine when to flush data pages to intermediate temp files.This configuration determines the memory to be used for storing data pages in memory.**NOTE:** Configuring a higher values ensures more data is maintained in memory and hence increases data loading performance due to reduced or no IO.Based on the memory availability in the nodes of the cluster, configure the values accordingly. |
 | carbon.column.compressor | snappy | CarbonData will compress the column values using the compressor specified by this configuration. Currently CarbonData supports 'snappy' and 'zstd' compressors. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index d4f49f5..bc0e280 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -19,10 +19,12 @@ package org.apache.carbondata.examples.sdk;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.sdk.file.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +42,12 @@ public class SDKS3Example {
             System.exit(0);
         }
 
+        String backupProperty = CarbonProperties.getInstance()
+            .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+                CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
+        CarbonProperties.getInstance()
+            .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true");
+
         String path = "s3a://sdk/WriterOutput";
         if (args.length > 3) {
             path=args[3];
@@ -87,5 +95,9 @@ public class SDKS3Example {
         }
         System.out.println("\nFinished");
         reader.close();
+
+        CarbonProperties.getInstance()
+            .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+                backupProperty);
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 8b51090..02abb8d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -243,10 +243,10 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
 
   test("test data loading with directly writing fact data to hdfs") {
     val originStatus = CarbonProperties.getInstance().getProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
     CarbonProperties.getInstance().addProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")
 
     val testData = s"$resourcesPath/sample.csv"
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
@@ -256,7 +256,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     )
 
     CarbonProperties.getInstance().addProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
       originStatus)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 4afb3ef..37d33c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -66,9 +66,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected WritableByteChannel fileChannel;
   protected long currentOffsetInFile;
   /**
-   * The path of CarbonData file to write in hdfs
+   * The path of CarbonData file to write in hdfs/s3
    */
-  private String carbonDataFileHdfsPath;
+  private String carbonDataFileStorePath;
   /**
    * The temp path of carbonData file used on executor
    */
@@ -145,9 +145,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    */
   protected DataMapWriterListener listener;
   /**
-   * Whether directly write fact data to hdfs
+   * Whether directly write fact data to store path
    */
-  private boolean enableDirectlyWriteData2Hdfs = false;
+  private boolean enableDirectlyWriteDataToStorePath = false;
 
   protected ExecutorService fallbackExecutorService;
 
@@ -172,11 +172,11 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
 
     // whether to directly write fact data to HDFS
     String directlyWriteData2Hdfs = propInstance
-        .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
-    this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+        .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+            CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
+    this.enableDirectlyWriteDataToStorePath = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
 
-    if (enableDirectlyWriteData2Hdfs) {
+    if (enableDirectlyWriteDataToStorePath) {
       LOGGER.info("Carbondata will directly write fact data to HDFS.");
     } else {
       LOGGER.info("Carbondata will write temporary fact data to local disk.");
@@ -225,7 +225,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
       // set the current file size to zero
       String activeFile =
-          enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : carbonDataFileTempPath;
+          enableDirectlyWriteDataToStorePath ? carbonDataFileStorePath : carbonDataFileTempPath;
       LOGGER.info("Writing data to file as max file size reached for file: "
           + activeFile + ". Data block size: " + currentFileSize);
       // write meta data to end of the existing file
@@ -269,7 +269,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    if (!enableDirectlyWriteData2Hdfs) {
+    if (!enableDirectlyWriteDataToStorePath) {
       try {
         if (copyInCurrentThread) {
           CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
@@ -296,14 +296,14 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
             "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
-    this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
+    this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() + File.separator
         + carbonDataFileName;
     try {
-      if (enableDirectlyWriteData2Hdfs) {
+      if (enableDirectlyWriteDataToStorePath) {
         // the block size will be twice the block_size specified by user to make sure that
         // one carbondata file only consists exactly one HDFS block.
         fileOutputStream = FileFactory
-            .getDataOutputStream(carbonDataFileHdfsPath, FileFactory.FileType.HDFS,
+            .getDataOutputStream(carbonDataFileStorePath, FileFactory.FileType.HDFS,
                 CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
       } else {
         //each time we initialize writer, we choose a local temp location randomly
@@ -380,11 +380,11 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     String indexFileName;
-    if (enableDirectlyWriteData2Hdfs) {
-      String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
-          .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
-              model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
+    if (enableDirectlyWriteDataToStorePath) {
+      String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonTablePath.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+          model.getBucketId(), model.getTaskExtension(),
+          "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
       indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
     } else {
       // randomly choose a temp location for index file
@@ -407,7 +407,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       writer.writeThrift(blockIndex);
     }
     writer.close();
-    if (!enableDirectlyWriteData2Hdfs) {
+    if (!enableDirectlyWriteDataToStorePath) {
       CarbonUtil
           .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(),
               fileSizeInBytes);