You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/29 06:46:14 UTC

[carbondata] branch master updated: [CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller.

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 706e8d3  [CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller.
706e8d3 is described below

commit 706e8d34c40da97e0d123f58eac3f6da3953f4d0
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Tue May 28 19:29:46 2019 +0530

    [CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller.
    
    Problem:
    If the merge index job is failed, the same job is triggered again.
    
    Solution:
    Merge index job exception has to be propagated to the caller. It should not trigger the same job again.
    
    Changes:
    (1) Merge index job failure will not be propagated to the caller. And will only be LOGGED.
    (2) Implement a new method to write the SegmentFile based on the current load timestamp. This helps in case of merge index failures and writing merge index for old store.
    
    This closes #3226
---
 .../core/constants/CarbonCommonConstants.java      | 12 +++++++
 .../carbondata/core/metadata/SegmentFileStore.java | 21 +++++++++++
 .../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 41 +++++++++++++++-------
 3 files changed, 62 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index aa9dd05..311019c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -346,6 +346,18 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
 
   /**
+   * It is the user defined property to specify whether to throw exception or not in case
+   * if the MERGE INDEX JOB is failed. Default value - TRUE
+   * TRUE - throws exception and fails the corresponding LOAD job
+   * FALSE - Logs the exception and continue with the LOAD
+   */
+  @CarbonProperty
+  public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION =
+      "carbon.merge.index.failure.throw.exception";
+
+  public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT = "true";
+
+  /**
    * property to be used for specifying the max byte limit for string/varchar data type till
    * where storing min/max in data file will be considered
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 69e5dc3..cbf58c7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -139,12 +139,32 @@ public class SegmentFileStore {
    */
   public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
       throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null);
+  }
+
+  /**
+   * Write segment file to the metadata folder of the table selecting only the current load files
+   *
+   * @param carbonTable
+   * @param segmentId
+   * @param UUID
+   * @param currentLoadTimeStamp
+   * @return
+   * @throws IOException
+   */
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
+      final String currentLoadTimeStamp) throws IOException {
     String tablePath = carbonTable.getTablePath();
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
     CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
     CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
+        if (null != currentLoadTimeStamp) {
+          return file.getName().contains(currentLoadTimeStamp) && (
+              file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+                  .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+        }
         return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
             .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
       }
@@ -185,6 +205,7 @@ public class SegmentFileStore {
     return null;
   }
 
+
   /**
    * Move the loaded data from source folder to destination folder.
    */
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index c101d02..bb930b4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.sql.SparkSession
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
@@ -38,6 +39,8 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
 
 object CarbonMergeFilesRDD {
 
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
   /**
    * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
    *
@@ -70,9 +73,8 @@ object CarbonMergeFilesRDD {
         readFileFooterFromCarbonDataFile).collect()
     } else {
       try {
-        if (CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+        if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {
           new CarbonMergeFilesRDD(
             sparkSession,
             carbonTable,
@@ -82,19 +84,34 @@ object CarbonMergeFilesRDD {
             readFileFooterFromCarbonDataFile).collect()
         }
       } catch {
-        case _: Exception =>
-          if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
-            new CarbonMergeFilesRDD(
-              sparkSession,
-              carbonTable,
-              segmentIds,
-              segmentFileNameToSegmentIdMap,
-              carbonTable.isHivePartitionTable,
-              readFileFooterFromCarbonDataFile).collect()
+        case ex: Exception =>
+          val message = "Merge Index files request is failed " +
+                        s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage
+          LOGGER.error(message)
+          if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION,
+            CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT)) {
+            throw new RuntimeException(message, ex)
           }
       }
     }
   }
+
+  /**
+   * Check whether the Merge Index Property is set by the user.
+   * If not set, take the default value of the property.
+   *
+   * @return
+   */
+  def isPropertySet(property: String, defaultValue: String): Boolean = {
+    var mergeIndex: Boolean = false
+    try {
+      mergeIndex = CarbonProperties.getInstance().getProperty(property, defaultValue).toBoolean
+    } catch {
+      case _: Exception =>
+        mergeIndex = defaultValue.toBoolean
+    }
+    mergeIndex
+  }
 }
 
 /**