You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by li...@apache.org on 2020/06/24 08:10:45 UTC

[carbondata] branch master updated: [CARBONDATA-3862] Insert stage performance optimazation

This is an automated email from the ASF dual-hosted git repository.

liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 755d78c  [CARBONDATA-3862] Insert stage performance optimazation
755d78c is described below

commit 755d78c03e7e08f5b6763f9b206b7ebf3ca2daf2
Author: haomarch <ma...@126.com>
AuthorDate: Mon Jun 22 05:21:27 2020 +0800

    [CARBONDATA-3862] Insert stage performance optimazation
    
    Why is this PR needed?
    There are two major performance bottlenecks of 'insert stage'.
    1) Get LastModifyTime of stagefiles requires a lot of access to OBS.
    2) Parallelism is not supported
    
    What changes were proposed in this PR?
    1) Cache the lastmodifytime info when list stage files.
    2) support insert stage in parallel. we add a tag 'loading' to the stages in process. different insertstage processes can load different data separately by choose the stages without 'loading' tag or stages loaded timeout. which avoid loading the same data between concurrent insertstage processes. The 'loading' tag is actually an empty file with '.loading' suffix filename.
    
    Does this PR introduce any user interface change?
    NO
    
    Is any new testcase added?
    YES
    
    This closes #3799
---
 .../core/constants/CarbonCommonConstants.java      |  10 ++
 .../filesystem/AbstractDFSCarbonFile.java          |   5 +
 .../carbondata/core/util/CarbonProperties.java     |  32 +++++
 .../carbondata/core/util/path/CarbonTablePath.java |   1 +
 docs/configuration-parameters.md                   |   1 +
 .../carbon/flink/TestCarbonPartitionWriter.scala   |  28 ++++
 .../management/CarbonInsertFromStageCommand.scala  | 153 +++++++++++++++++----
 7 files changed, 203 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4e24618..4a0605e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1521,6 +1521,16 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_QUERY_STAGE_INPUT_DEFAULT = "false";
 
+  /**
+   * MAX_TIMEOUT_FOR_INSERTSTAGE_JOB
+   */
+  public static final String CARBON_INSERT_STAGE_TIMEOUT = "carbon.insert.stage.timeout";
+
+  /**
+   * DFAULT_MAX_TIMEOUT_FOR_INSERTSTAGE_JOB: 8 hour
+   */
+  public static final long CARBON_INSERT_STAGE_TIMEOUT_DEFAULT = 28800000;
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Index parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 4459420..c2d81db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -63,6 +63,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
   protected FileSystem fileSystem;
   protected Configuration hadoopConf;
   protected Path path;
+  protected FileStatus fileStatus;
 
   AbstractDFSCarbonFile(String filePath) {
     this(filePath, FileFactory.getConfiguration());
@@ -89,6 +90,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
 
   AbstractDFSCarbonFile(FileStatus fileStatus) {
     this(fileStatus.getPath());
+    this.fileStatus = fileStatus;
   }
 
   @Override
@@ -177,6 +179,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
   @Override
   public long getLastModifiedTime() {
     try {
+      if (this.fileStatus != null) {
+        return this.fileStatus.getModificationTime();
+      }
       return fileSystem.getFileStatus(path).getModificationTime();
     } catch (IOException e) {
       throw new CarbonFileException("Unable to get file status: ", e);
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a238877..90409e8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1904,6 +1904,38 @@ public final class CarbonProperties {
   }
 
   /**
+   * Validate and get the input metrics interval
+   *
+   * @return input metrics interval
+   */
+  public static Long getInsertStageTimeout() {
+    String timeout = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT);
+    if (timeout == null) {
+      return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;
+    } else {
+      try {
+        long configuredValue = Long.parseLong(timeout);
+        if (configuredValue < 0) {
+          LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " +
+                  "is invalid. Ignoring it. use default value:\"%s\"", timeout,
+                  CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT,
+                  CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT));
+          return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;
+        } else {
+          return configuredValue;
+        }
+      } catch (NumberFormatException e) {
+        LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " +
+                "is invalid. Ignoring it. use default value:\"%s\"", timeout,
+                CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT,
+                CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT));
+        return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;
+      }
+    }
+  }
+
+  /**
    * Validate and get query prefetch enable
    *
    * @return boolean prefetch value
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index a862af0..6914bcf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -60,6 +60,7 @@ public class CarbonTablePath {
   private static final String STAGE_DIR = "stage";
   private static final String STAGE_DATA_DIR = "stage_data";
   public static final String  SUCCESS_FILE_SUBFIX = ".success";
+  public static final String  LOADING_FILE_SUBFIX = ".loading";
   private static final String SNAPSHOT_FILE_NAME = "snapshot";
 
   public static final String SYSTEM_FOLDER_DIR = "_system";
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 1e4e9f3..a1baaaf 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -137,6 +137,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.push.rowfilters.for.vector | false | When enabled complete row filters will be handled by carbon in case of vector. If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector. And also there are scan optimizations in carbon to avoid multiple data copies when this parameter is set to false. There is no change in flow for non-vector based queries. |
 | carbon.query.prefetch.enable | true | By default this property is true, so prefetch is used in query to read next blocklet asynchronously in other thread while processing current blocklet in main thread. This can help to reduce CPU idle time. Setting this property false will disable this prefetch feature in query. |
 | carbon.query.stage.input.enable | false | Stage input files are data files written by external applications (such as Flink), but have not been loaded into carbon table. Enabling this configuration makes query to include these files, thus makes query on latest data. However, since these files are not indexed, query maybe slower as full scan is required for these files. |
+| carbon.insert.stage.timeout | 28800000 | Timeout threshold of insert stage processing, stages will be reloaded if the load duration beyond the configured value |
 | carbon.driver.pruning.multi.thread.enable.files.count | 100000 | To prune in multi-thread when total number of segment files for a query increases beyond the configured value. |
 | carbon.load.all.segment.indexes.to.cache | true | Setting this configuration to false, will prune and load only matched segment indexes to cache using segment metadata information such as columnid and it's minmax values, which decreases the usage of driver memory.  |
 | carbon.secondary.index.creation.threads | 1 | Specifies the number of threads to concurrently process segments during secondary index creation. This property helps fine tuning the system when there are a lot of segments in a table. The value range is 1 to 50. |
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index ac17df2..61c4121 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -19,6 +19,7 @@ package org.apache.carbon.flink
 
 import java.io.{File, InputStreamReader}
 import java.util
+import java.util.concurrent.{Callable, Executors}
 import java.util.{Base64, Collections, Properties}
 
 import com.google.gson.Gson
@@ -65,7 +66,34 @@ class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll{
       sql(s"INSERT INTO $tableName STAGE")
 
       checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+    }
+  }
+
+  test("test concurrent insertstage") {
+    createPartitionTable
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(6)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 1000
+      val source = getTestSource(dataCount)
+      executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
 
+      val executorService = Executors.newFixedThreadPool(10)
+      for(i <- 1 to 10) {
+        executorService.submit(new Runnable {
+          override def run(): Unit = {
+            sql(s"INSERT INTO $tableName STAGE OPTIONS('batch_file_count'='1')")
+          }
+        }).get()
+      }
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index e22b89e..cb27db9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileSt
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -82,6 +83,8 @@ case class CarbonInsertFromStageCommand(
     val tablePath = table.getTablePath
     val stagePath = CarbonTablePath.getStageDir(tablePath)
     val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
+    var stageFiles: Array[(CarbonFile, CarbonFile)] = Array.empty
+    var executorService: ExecutorService = null
     val lock = acquireIngestLock(table)
 
     try {
@@ -140,7 +143,7 @@ case class CarbonInsertFromStageCommand(
       }
       LOGGER.info("Option [" + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY +
                   "] value is " + orderType)
-      val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize,
+      stageFiles = listStageFiles(stagePath, hadoopConf, batchSize,
         orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC))
       if (stageFiles.isEmpty) {
         // no stage files, so do nothing
@@ -148,10 +151,26 @@ case class CarbonInsertFromStageCommand(
         return Seq.empty
       }
 
+      // We add a tag 'loading' to the stages in process.
+      // different insertstage processes can load different data separately
+      // by choose the stages without 'loading' tag or stages loaded timeout.
+      // which avoid loading the same data between concurrent insertstage processes.
+      // The 'loading' tag is actually an empty file with
+      // '.loading' suffix filename
+      val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
+      executorService = Executors.newFixedThreadPool(numThreads)
+      createStageLoadingFilesWithRetry(executorService, stageFiles)
+    } catch {
+      case ex: Throwable =>
+        LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
+        throw ex
+    } finally {
+      lock.unlock()
+    }
+
+    try{
       // 2) read all stage files to collect input files for data loading
       // create a thread pool to read them
-      val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
       val stageInputs = collectStageInputs(executorService, stageFiles)
 
       // 3) perform data loading
@@ -170,8 +189,6 @@ case class CarbonInsertFromStageCommand(
       case ex: Throwable =>
         LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
         throw ex
-    } finally {
-      lock.unlock()
     }
     Seq.empty
   }
@@ -478,32 +495,97 @@ case class CarbonInsertFromStageCommand(
   }
 
   /**
-   * Delete stage file and success file
-   * Return false means the stage files were cleaned successfully
-   * While return true means the stage files were failed to clean
+   * create '.loading' file to tag the stage in process
+   * return the loading files failed to create
+   */
+  private def createStageLoadingFiles(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = {
+    stageFiles.map { files =>
+      executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
+        override def call(): (CarbonFile, CarbonFile, Boolean) = {
+          // Get the loading files path
+          val stageLoadingFile =
+            FileFactory.getCarbonFile(files._1.getAbsolutePath +
+              CarbonTablePath.LOADING_FILE_SUBFIX);
+          // Try to create loading files
+          // make isFailed to be true if createNewFile return false.
+          // the reason can be file exists or exceptions.
+          var isFailed = !stageLoadingFile.createNewFile()
+          // if file exists, modify the lastmodifiedtime of the file.
+          if (isFailed) {
+            // make isFailed to be true if setLastModifiedTime return false.
+            isFailed = !stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+          }
+          (files._1, files._2, isFailed)
+        }
+      })
+    }.map { future =>
+      future.get()
+    }.filter { files =>
+      // keep the files when isFailed is ture. so we can retry on these files.
+      files._3
+    }.map { files =>
+      (files._1, files._2)
+    }
+  }
+
+  /**
+   * create '.loading' file with retry
+   */
+  private def createStageLoadingFilesWithRetry(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
+    val startTime = System.currentTimeMillis()
+    var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
+    var needToCreateStageLoadingFiles = stageFiles
+    while (retry > 0 && needToCreateStageLoadingFiles.nonEmpty) {
+      needToCreateStageLoadingFiles =
+        createStageLoadingFiles(executorService, needToCreateStageLoadingFiles)
+      retry -= 1
+    }
+    LOGGER.info(s"finished to create stage loading files, time taken: " +
+      s"${System.currentTimeMillis() - startTime}ms")
+    if (needToCreateStageLoadingFiles.nonEmpty) {
+      LOGGER.warn(s"failed to create loading files:" +
+        needToCreateStageLoadingFiles.map(_._1.getName).mkString(","))
+    }
+  }
+
+  /**
+   * Delete stage files and success files and loading files
+   * Return the files failed to delete
    */
   private def deleteStageFiles(
       executorService: ExecutorService,
       stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = {
     stageFiles.map { files =>
-      executorService.submit(new Callable[Boolean] {
-        override def call(): Boolean = {
+      executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
+        override def call(): (CarbonFile, CarbonFile, Boolean) = {
+          // Delete three types of file: stage|.success|.loading
+          val stageLoadingFile =
+            FileFactory.getCarbonFile(files._1.getAbsolutePath
+              + CarbonTablePath.LOADING_FILE_SUBFIX);
+          var isFailed = false
           // If delete() return false, maybe the reason is FileNotFount or FileFailedClean.
           // Considering FileNotFound means FileCleanSucessfully.
           // We need double check the file exists or not when delete() return false.
-          if (!(files._1.delete() && files._2.delete())) {
-            // If the file still exists, return ture, let the file filtered in.
+          if (!(files._1.delete() && files._2.delete() && stageLoadingFile.delete())) {
+            // If the file still exists,  make isFailed to be true
             // So we can retry to delete this file.
-            return files._1.exists() || files._1.exists()
+            isFailed = files._1.exists() || files._1.exists() || stageLoadingFile.exists()
           }
-          // When delete successfully, return false, let the file filtered away.
-          false
+          (files._1, files._2, isFailed)
         }
       })
-    }.filter { future =>
+    }.map { future =>
       future.get()
+    }.filter { files =>
+      // keep the files when isFailed is ture. so we can retry on these files.
+      files._3
+    }.map { files =>
+      (files._1, files._2)
     }
-    stageFiles
   }
 
   /**
@@ -514,14 +596,18 @@ case class CarbonInsertFromStageCommand(
       stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
     val startTime = System.currentTimeMillis()
     var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
-    while (deleteStageFiles(executorService, stageFiles).length > 0 && retry > 0) {
+    var needToDeleteStageFiles = stageFiles
+    while (retry > 0 && needToDeleteStageFiles.nonEmpty) {
+      needToDeleteStageFiles =
+        deleteStageFiles(executorService, needToDeleteStageFiles)
       retry -= 1
     }
     LOGGER.info(s"finished to delete stage files, time taken: " +
       s"${System.currentTimeMillis() - startTime}ms")
     // if there are still stage files failed to clean, print log.
-    if (stageFiles.length > 0) {
-      LOGGER.warn(s"failed to clean up stage files:" + stageFiles.map(_._1.getName).mkString(","))
+    if (needToDeleteStageFiles.nonEmpty) {
+      LOGGER.warn(s"failed to clean up stage files:" +
+        needToDeleteStageFiles.map(_._1.getName).mkString(","))
     }
   }
 
@@ -569,22 +655,34 @@ case class CarbonInsertFromStageCommand(
   ): Array[(CarbonFile, CarbonFile)] = {
     val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
     if (dir.exists()) {
-      // Only HDFS/OBS/S3 server side can guarantee the files got from iterator are sorted
-      // based on file name so that we can use iterator to get the A and A.success together
-      // without loop all files which can improve performance compared with list all files.
-      // One file and another with '.success', so we need *2 as total and this value is just
-      // an approximate value. For local files, as can it can we not guarantee the order, we
-      // just list all.
-      val allFiles = dir.listFiles(false, batchSize * 2)
+      val allFiles = dir.listFiles()
       val successFiles = allFiles.filter { file =>
         file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
       }.map { file =>
         (file.getName.substring(0, file.getName.indexOf(".")), file)
       }.toMap
+
+      // different insertstage processes can load different data separately
+      // by choose the stages without 'loading' tag or stages loaded timeout.
+      // which avoid loading the same data between concurrent insertstage processes.
+      // Overall, There are two conditions to choose stages to process:
+      // 1) stages never loaded, choose the stages without '.loading' tag.
+      // 2) stages loaded timeout, the timeout threshold depends on INSERT_STAGE_TIMEOUT
+      val loadingFiles = allFiles.filter { file =>
+        file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUBFIX)
+      }.filter { file =>
+        (System.currentTimeMillis() - file.getLastModifiedTime) <
+          CarbonInsertFromStageCommand.INSERT_STAGE_TIMEOUT
+      }.map { file =>
+        (file.getName.substring(0, file.getName.indexOf(".")), file)
+      }.toMap
+
       val stageFiles = allFiles.filter { file =>
         !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
       }.filter { file =>
         successFiles.contains(file.getName)
+      }.filterNot { file =>
+        loadingFiles.contains(file.getName)
       }.sortWith {
         (file1, file2) =>
           if (ascendingSort) {
@@ -655,4 +753,5 @@ object CarbonInsertFromStageCommand {
   */
   val BATCH_FILE_ORDER_DEFAULT: String = BATCH_FILE_ORDER_ASC
 
+  val INSERT_STAGE_TIMEOUT = CarbonProperties.getInsertStageTimeout
 }