You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/29 06:46:14 UTC
[carbondata] branch master updated: [CARBONDATA-3393] Merge Index
Job Failure should not trigger the merge index job again. Exception should
be propagated to the caller.
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 706e8d3 [CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller.
706e8d3 is described below
commit 706e8d34c40da97e0d123f58eac3f6da3953f4d0
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Tue May 28 19:29:46 2019 +0530
[CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller.
Problem:
If the merge index job is failed, the same job is triggered again.
Solution:
Merge index job exception has to be propagated to the caller. It should not trigger the same job again.
Changes:
(1) Merge index job failure will not be propagated to the caller. And will only be LOGGED.
(2) Implement a new method to write the SegmentFile based on the current load timestamp. This helps in case of merge index failures and writing merge index for old store.
This closes #3226
---
.../core/constants/CarbonCommonConstants.java | 12 +++++++
.../carbondata/core/metadata/SegmentFileStore.java | 21 +++++++++++
.../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 41 +++++++++++++++-------
3 files changed, 62 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index aa9dd05..311019c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -346,6 +346,18 @@ public final class CarbonCommonConstants {
public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
/**
+ * It is the user defined property to specify whether to throw exception or not in case
+ * if the MERGE INDEX JOB is failed. Default value - TRUE
+ * TRUE - throws exception and fails the corresponding LOAD job
+ * FALSE - Logs the exception and continue with the LOAD
+ */
+ @CarbonProperty
+ public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION =
+ "carbon.merge.index.failure.throw.exception";
+
+ public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT = "true";
+
+ /**
* property to be used for specifying the max byte limit for string/varchar data type till
* where storing min/max in data file will be considered
*/
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 69e5dc3..cbf58c7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -139,12 +139,32 @@ public class SegmentFileStore {
*/
public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
throws IOException {
+ return writeSegmentFile(carbonTable, segmentId, UUID, null);
+ }
+
+ /**
+ * Write segment file to the metadata folder of the table selecting only the current load files
+ *
+ * @param carbonTable
+ * @param segmentId
+ * @param UUID
+ * @param currentLoadTimeStamp
+ * @return
+ * @throws IOException
+ */
+ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
+ final String currentLoadTimeStamp) throws IOException {
String tablePath = carbonTable.getTablePath();
boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
+ if (null != currentLoadTimeStamp) {
+ return file.getName().contains(currentLoadTimeStamp) && (
+ file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+ .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+ }
return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
}
@@ -185,6 +205,7 @@ public class SegmentFileStore {
return null;
}
+
/**
* Move the loaded data from source folder to destination folder.
*/
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index c101d02..bb930b4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.SparkSession
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
@@ -38,6 +39,8 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
object CarbonMergeFilesRDD {
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
/**
* Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
*
@@ -70,9 +73,8 @@ object CarbonMergeFilesRDD {
readFileFooterFromCarbonDataFile).collect()
} else {
try {
- if (CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+ if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {
new CarbonMergeFilesRDD(
sparkSession,
carbonTable,
@@ -82,19 +84,34 @@ object CarbonMergeFilesRDD {
readFileFooterFromCarbonDataFile).collect()
}
} catch {
- case _: Exception =>
- if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
- new CarbonMergeFilesRDD(
- sparkSession,
- carbonTable,
- segmentIds,
- segmentFileNameToSegmentIdMap,
- carbonTable.isHivePartitionTable,
- readFileFooterFromCarbonDataFile).collect()
+ case ex: Exception =>
+ val message = "Merge Index files request is failed " +
+ s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage
+ LOGGER.error(message)
+ if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION,
+ CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT)) {
+ throw new RuntimeException(message, ex)
}
}
}
}
+
+ /**
+ * Check whether the Merge Index Property is set by the user.
+ * If not set, take the default value of the property.
+ *
+ * @return
+ */
+ def isPropertySet(property: String, defaultValue: String): Boolean = {
+ var mergeIndex: Boolean = false
+ try {
+ mergeIndex = CarbonProperties.getInstance().getProperty(property, defaultValue).toBoolean
+ } catch {
+ case _: Exception =>
+ mergeIndex = defaultValue.toBoolean
+ }
+ mergeIndex
+ }
}
/**