You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/16 13:12:42 UTC

carbondata git commit: [HOTFIX] Fix CI random failure

Repository: carbondata
Updated Branches:
  refs/heads/master a386f1f4e -> 04ff36764


[HOTFIX] Fix CI random failure

This closes #2068


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/04ff3676
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/04ff3676
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/04ff3676

Branch: refs/heads/master
Commit: 04ff36764c797264f5396fa3cbf1d6fe883737e0
Parents: a386f1f
Author: Jacky Li <ja...@qq.com>
Authored: Thu Mar 15 19:49:07 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 21:12:25 2018 +0800

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 66 ++++++++++++--------
 .../carbondata/spark/util/CommonUtil.scala      |  2 +-
 .../preaaggregate/PreAggregateTableHelper.scala | 13 +++-
 3 files changed, 53 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 820a5a4..f466018 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.carbondata.common.exceptions.TableStatusLockException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -834,10 +833,30 @@ public class SegmentStatusManager {
     }
   }
 
-  public static void deleteLoadsAndUpdateMetadata(
+  private static class ReturnTuple {
+    LoadMetadataDetails[] details;
+    boolean isUpdateRequired;
+    ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) {
+      this.details = details;
+      this.isUpdateRequired = isUpdateRequired;
+    }
+  }
+
+  private static ReturnTuple isUpdationRequired(
+      boolean isForceDeletion,
       CarbonTable carbonTable,
-      boolean isForceDeletion) throws IOException {
-    deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    LoadMetadataDetails[] details =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    // Delete marked loads
+    boolean isUpdationRequired =
+        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+            absoluteTableIdentifier,
+            isForceDeletion,
+            details,
+            carbonTable.getMetadataPath()
+        );
+    return new ReturnTuple(details, isUpdationRequired);
   }
 
   public static void deleteLoadsAndUpdateMetadata(
@@ -845,31 +864,29 @@ public class SegmentStatusManager {
       boolean isForceDeletion,
       List<PartitionSpec> partitionSpecs) throws IOException {
     if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
-      LoadMetadataDetails[] details =
-          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
       AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
-      ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
-          identifier, LockUsage.TABLE_STATUS_LOCK);
-
-      // Delete marked loads
-      boolean isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-          identifier, isForceDeletion, details, carbonTable.getMetadataPath());
-
-      boolean updationCompletionStatus = false;
-
-      if (isUpdationRequired) {
+      ReturnTuple tuple = isUpdationRequired(isForceDeletion, carbonTable, identifier);
+      if (tuple.isUpdateRequired) {
+        ICarbonLock carbonTableStatusLock =
+            CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
+        boolean locked = false;
         try {
           // Update load metadate file after cleaning deleted nodes
-          if (carbonTableStatusLock.lockWithRetries()) {
+          locked = carbonTableStatusLock.lockWithRetries();
+          if (locked) {
             LOG.info("Table status lock has been successfully acquired.");
-
+            // Again read status and check to verify updation required or not.
+            ReturnTuple tuple2 = isUpdationRequired(isForceDeletion, carbonTable, identifier);
+            if (!tuple2.isUpdateRequired) {
+              return;
+            }
             // read latest table status again.
             LoadMetadataDetails[] latestMetadata =
                 SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
 
             // update the metadata details from old to new status.
             List<LoadMetadataDetails> latestStatus =
-                updateLoadMetadataFromOldToNew(details, latestMetadata);
+                updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata);
 
             writeLoadMetadata(identifier, latestStatus);
           } else {
@@ -881,14 +898,13 @@ public class SegmentStatusManager {
                 "running in the background.";
             LOG.audit(errorMsg);
             LOG.error(errorMsg);
-            throw new TableStatusLockException(errorMsg + " Please try after some time.");
+            throw new IOException(errorMsg + " Please try after some time.");
           }
-          updationCompletionStatus = true;
+          DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
+              identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
         } finally {
-          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
-          if (updationCompletionStatus) {
-            DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
-                identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
+          if (locked) {
+            CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d3093fb..77ff139 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -815,7 +815,7 @@ object CommonUtil {
                 try {
                   val carbonTable = CarbonMetadata.getInstance
                     .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
-                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index b64c91e..d89aa5b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -26,12 +26,13 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
 /**
  * Below helper class will be used to create pre-aggregate table
@@ -164,7 +165,15 @@ case class PreAggregateTableHelper(
     // This will be used to check if the parent table has any segments or not. If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.
-    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+      parentTable,
+      false,
+      CarbonFilters.getCurrentPartitions(
+        sparkSession,
+        TableIdentifier(parentTable.getTableName,
+          Some(parentTable.getDatabaseName))
+      ).map(_.asJava).orNull)
+
     if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
       throw new UnsupportedOperationException(
         "Cannot create pre-aggregate table when insert is in progress on main table")