You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/16 13:12:42 UTC
carbondata git commit: [HOTFIX] Fix CI random failure
Repository: carbondata
Updated Branches:
refs/heads/master a386f1f4e -> 04ff36764
[HOTFIX] Fix CI random failure
This closes #2068
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/04ff3676
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/04ff3676
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/04ff3676
Branch: refs/heads/master
Commit: 04ff36764c797264f5396fa3cbf1d6fe883737e0
Parents: a386f1f
Author: Jacky Li <ja...@qq.com>
Authored: Thu Mar 15 19:49:07 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 21:12:25 2018 +0800
----------------------------------------------------------------------
.../statusmanager/SegmentStatusManager.java | 66 ++++++++++++--------
.../carbondata/spark/util/CommonUtil.scala | 2 +-
.../preaaggregate/PreAggregateTableHelper.scala | 13 +++-
3 files changed, 53 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 820a5a4..f466018 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.carbondata.common.exceptions.TableStatusLockException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -834,10 +833,30 @@ public class SegmentStatusManager {
}
}
- public static void deleteLoadsAndUpdateMetadata(
+ private static class ReturnTuple {
+ LoadMetadataDetails[] details;
+ boolean isUpdateRequired;
+ ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) {
+ this.details = details;
+ this.isUpdateRequired = isUpdateRequired;
+ }
+ }
+
+ private static ReturnTuple isUpdationRequired(
+ boolean isForceDeletion,
CarbonTable carbonTable,
- boolean isForceDeletion) throws IOException {
- deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+ AbsoluteTableIdentifier absoluteTableIdentifier) {
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+ // Delete marked loads
+ boolean isUpdationRequired =
+ DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+ absoluteTableIdentifier,
+ isForceDeletion,
+ details,
+ carbonTable.getMetadataPath()
+ );
+ return new ReturnTuple(details, isUpdationRequired);
}
public static void deleteLoadsAndUpdateMetadata(
@@ -845,31 +864,29 @@ public class SegmentStatusManager {
boolean isForceDeletion,
List<PartitionSpec> partitionSpecs) throws IOException {
if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
- LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
- ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
- identifier, LockUsage.TABLE_STATUS_LOCK);
-
- // Delete marked loads
- boolean isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
- identifier, isForceDeletion, details, carbonTable.getMetadataPath());
-
- boolean updationCompletionStatus = false;
-
- if (isUpdationRequired) {
+ ReturnTuple tuple = isUpdationRequired(isForceDeletion, carbonTable, identifier);
+ if (tuple.isUpdateRequired) {
+ ICarbonLock carbonTableStatusLock =
+ CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
+ boolean locked = false;
try {
// Update load metadate file after cleaning deleted nodes
- if (carbonTableStatusLock.lockWithRetries()) {
+ locked = carbonTableStatusLock.lockWithRetries();
+ if (locked) {
LOG.info("Table status lock has been successfully acquired.");
-
+ // Again read status and check to verify updation required or not.
+ ReturnTuple tuple2 = isUpdationRequired(isForceDeletion, carbonTable, identifier);
+ if (!tuple2.isUpdateRequired) {
+ return;
+ }
// read latest table status again.
LoadMetadataDetails[] latestMetadata =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
// update the metadata details from old to new status.
List<LoadMetadataDetails> latestStatus =
- updateLoadMetadataFromOldToNew(details, latestMetadata);
+ updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata);
writeLoadMetadata(identifier, latestStatus);
} else {
@@ -881,14 +898,13 @@ public class SegmentStatusManager {
"running in the background.";
LOG.audit(errorMsg);
LOG.error(errorMsg);
- throw new TableStatusLockException(errorMsg + " Please try after some time.");
+ throw new IOException(errorMsg + " Please try after some time.");
}
- updationCompletionStatus = true;
+ DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
+ identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
} finally {
- CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
- if (updationCompletionStatus) {
- DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
- identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
+ if (locked) {
+ CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d3093fb..77ff139 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -815,7 +815,7 @@ object CommonUtil {
try {
val carbonTable = CarbonMetadata.getInstance
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
} catch {
case _: Exception =>
LOGGER.warn(s"Error while cleaning table " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index b64c91e..d89aa5b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -26,12 +26,13 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
/**
* Below helper class will be used to create pre-aggregate table
@@ -164,7 +165,15 @@ case class PreAggregateTableHelper(
// This will be used to check if the parent table has any segments or not. If not then no
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+ parentTable,
+ false,
+ CarbonFilters.getCurrentPartitions(
+ sparkSession,
+ TableIdentifier(parentTable.getTableName,
+ Some(parentTable.getDatabaseName))
+ ).map(_.asJava).orNull)
+
if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
throw new UnsupportedOperationException(
"Cannot create pre-aggregate table when insert is in progress on main table")