You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/09/27 17:48:16 UTC
carbondata git commit: [CARBONDATA-1486] Fixed issue of table status
updation on insert overwrite failure and exception thrown while deletion of
stale folders
Repository: carbondata
Updated Branches:
refs/heads/master d304ef96e -> c4f312fa1
[CARBONDATA-1486] Fixed issue of table status updation on insert overwrite failure and exception thrown while deletion of stale folders
This closes #1368
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c4f312fa
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c4f312fa
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c4f312fa
Branch: refs/heads/master
Commit: c4f312fa1dda5774fac715aa96fde3094f42a592
Parents: d304ef9
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Sep 18 18:03:04 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Sep 28 01:47:56 2017 +0800
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 23 +++++++++++++++++--
.../load/DataLoadProcessorStepOnSpark.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 24 ++++++++++++++++++++
.../spark/rdd/CarbonDataRDDFactory.scala | 9 +++++---
.../newflow/sort/SortStepRowUtil.java | 3 +--
.../UnsafeSingleThreadFinalSortFilesMerger.java | 2 +-
6 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 7c2d157..9fe003f 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -289,9 +289,18 @@ public final class CarbonLoaderUtil {
newMetaEntry.setLoadName(segmentId);
loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
loadModel.setSegmentId(segmentId);
+ // Exception should be thrown if:
+ // 1. If insert overwrite is in progress and any other load or insert operation
+ // is triggered
+ // 2. If load or insert into operation is in progress and insert overwrite operation
+ // is triggered
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
throw new RuntimeException("Already insert overwrite is in progress");
+ } else if (
+ newMetaEntry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())
+ && entry.getLoadStatus().equals(LoadStatusType.IN_PROGRESS.getMessage())) {
+ throw new RuntimeException("Already insert into or load is in progress");
}
}
listOfLoadFolderDetails.add(newMetaEntry);
@@ -318,7 +327,11 @@ public final class CarbonLoaderUtil {
// For insert overwrite, we will delete the old segment folder immediately
// So collect the old segments here
String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
- staleFolders.add(FileFactory.getCarbonFile(path));
+ // add to the deletion list only if file exist else HDFS file system will throw
+ // exception while deleting the file if file path does not exist
+ if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+ staleFolders.add(FileFactory.getCarbonFile(path));
+ }
}
}
}
@@ -328,7 +341,13 @@ public final class CarbonLoaderUtil {
.toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
// Delete all old stale segment folders
for (CarbonFile staleFolder : staleFolders) {
- CarbonUtil.deleteFoldersAndFiles(staleFolder);
+ // try block is inside for loop because even if there is failure in deletion of 1 stale
+ // folder still remaining stale folders should be deleted
+ try {
+ CarbonUtil.deleteFoldersAndFiles(staleFolder);
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error("Failed to delete stale folder: " + e.getMessage());
+ }
}
status = true;
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index aaf7926..6943dcb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -136,7 +136,7 @@ object DataLoadProcessorStepOnSpark {
override def next(): CarbonRow = {
val row =
- new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters, true))
+ new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters))
rowCounter.add(1)
row
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f123624..5040e69 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -537,6 +537,30 @@ object CommonUtil {
}
}
+ /**
+ * This method will update the load failure entry in the table status file
+ *
+ * @param model
+ */
+ def updateTableStatusForFailure(
+ model: CarbonLoadModel): Unit = {
+ // in case if failure the load status should be "Marked for delete" so that it will be taken
+ // care during clean up
+ val loadStatus = CarbonCommonConstants.MARKED_FOR_DELETE
+ // always the last entry in the load metadata details will be the current load entry
+ val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
+ CarbonLoaderUtil
+ .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true)
+ val updationStatus = CarbonLoaderUtil.recordLoadMetadata(loadMetaEntry, model, false, false)
+ if (!updationStatus) {
+ sys
+ .error(s"Failed to update failure entry in table status for ${
+ model
+ .getDatabaseName
+ }.${ model.getTableName }")
+ }
+ }
+
def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
val details = SegmentStatusManager.readLoadMetadata(metadataPath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0edfccf..3d67a79 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1069,13 +1069,14 @@ object CarbonDataRDDFactory {
return
}
if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+ // update the load entry in table status file for changing the status to failure
+ CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
- updateStatus(status, loadStatus)
throw new Exception(errorMessage)
} else {
// check if data load fails due to bad record and throw data load failure due to
@@ -1083,16 +1084,19 @@ object CarbonDataRDDFactory {
if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+ // update the load entry in table status file for changing the status to failure
+ CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+ // update the load entry in table status file for changing the status to failure
+ CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
@@ -1100,7 +1104,6 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
" as there is no data to load")
LOGGER.warn("Cannot write load metadata file as data load failed")
- updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
throw new Exception("No Data to load")
}
writeDictionary(carbonLoadModel, result, false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
index 5238c3c..62434bc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -21,8 +21,7 @@ import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
public class SortStepRowUtil {
- public static Object[] convertRow(Object[] data, SortParameters parameters,
- boolean needConvertDecimalToByte) {
+ public static Object[] convertRow(Object[] data, SortParameters parameters) {
int measureCount = parameters.getMeasureColCount();
int dimensionCount = parameters.getDimColCount();
int complexDimensionCount = parameters.getComplexDimColCount();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 1455365..e3bbdcb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -183,7 +183,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
* @return sorted row
*/
public Object[] next() {
- return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters, false);
+ return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
}
/**