You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/03/28 15:49:12 UTC

[carbondata] branch master updated: [CARBONDATA-3332] Blocked concurrent compaction and update/delete

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 36e6840  [CARBONDATA-3332] Blocked concurrent compaction and update/delete
36e6840 is described below

commit 36e6840d9fb67e7a2f9de4a6549c513588cdf159
Author: kunal642 <ku...@gmail.com>
AuthorDate: Wed Mar 27 14:44:11 2019 +0530

    [CARBONDATA-3332] Blocked concurrent compaction and update/delete
    
    Problem:
    When update and compaction are executed concurrently then update is trying to update the contents of segment 0 whereas compaction has already marked the segment as COMPACTED. This compacted segment is of no use to the update command and therefore when trying to access the segment info from map it throws key not found.
    
    Solution:
    Block update and compaction concurrent operations.
    
    This closes #3166
---
 .../apache/carbondata/core/locks/LockUsage.java    |  1 +
 .../spark/rdd/CarbonDataRDDFactory.scala           | 48 +++++++-----
 .../CarbonAlterTableCompactionCommand.scala        | 55 +++++++------
 .../mutation/CarbonProjectForDeleteCommand.scala   | 18 ++++-
 .../mutation/CarbonProjectForUpdateCommand.scala   | 90 ++++++++++++----------
 5 files changed, 123 insertions(+), 89 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index b16c3f1..14907c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -36,5 +36,6 @@ public class LockUsage {
   public static final String STREAMING_LOCK = "streaming.lock";
   public static final String DATAMAP_STATUS_LOCK = "datamapstatus.lock";
   public static final String CONCURRENT_LOAD_LOCK = "concurrentload.lock";
+  public static final String UPDATE_LOCK = "update.lock";
 
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8268379..8a04887 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -867,27 +868,34 @@ object CarbonDataRDDFactory {
         val lock = CarbonLockFactory.getCarbonLockObj(
           carbonTable.getAbsoluteTableIdentifier,
           LockUsage.COMPACTION_LOCK)
-
-        if (lock.lockWithRetries()) {
-          LOGGER.info("Acquired the compaction lock.")
-          try {
-            startCompactionThreads(sqlContext,
-              carbonLoadModel,
-              storeLocation,
-              compactionModel,
-              lock,
-              compactedSegments,
-              operationContext
-            )
-          } catch {
-            case e: Exception =>
-              LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-              lock.unlock()
-              throw e
+        val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
+          .getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
+        try {
+          if (updateLock.lockWithRetries(3, 3)) {
+            if (lock.lockWithRetries()) {
+              LOGGER.info("Acquired the compaction lock.")
+              startCompactionThreads(sqlContext,
+                carbonLoadModel,
+                storeLocation,
+                compactionModel,
+                lock,
+                compactedSegments,
+                operationContext
+              )
+            } else {
+              LOGGER.error("Not able to acquire the compaction lock for table " +
+                           s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
+            }
+          } else {
+            throw new ConcurrentOperationException(carbonTable, "update", "compaction")
           }
-        } else {
-          LOGGER.error("Not able to acquire the compaction lock for table " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
+        } catch {
+          case e: Exception =>
+            LOGGER.error(s"Exception in start compaction thread.", e)
+            lock.unlock()
+            throw e
+        } finally {
+          updateLock.unlock()
         }
       }
     }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 419fa16..9d8bf90 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -288,31 +288,38 @@ case class CarbonAlterTableCompactionCommand(
       val lock = CarbonLockFactory.getCarbonLockObj(
         carbonTable.getAbsoluteTableIdentifier,
         LockUsage.COMPACTION_LOCK)
-
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the compaction lock for table" +
-                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        try {
-          CarbonDataRDDFactory.startCompactionThreads(
-            sqlContext,
-            carbonLoadModel,
-            storeLocation,
-            compactionModel,
-            lock,
-            compactedSegments,
-            operationContext
-          )
-        } catch {
-          case e: Exception =>
-            LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-            lock.unlock()
-            throw e
+      val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
+        .getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
+      try {
+        if (updateLock.lockWithRetries(3, 3)) {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            CarbonDataRDDFactory.startCompactionThreads(
+              sqlContext,
+              carbonLoadModel,
+              storeLocation,
+              compactionModel,
+              lock,
+              compactedSegments,
+              operationContext
+            )
+          } else {
+            LOGGER.error(s"Not able to acquire the compaction lock for table" +
+                         s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            CarbonException.analysisException(
+              "Table is already locked for compaction. Please try after some time.")
+          }
+        } else {
+          throw new ConcurrentOperationException(carbonTable, "update", "compaction")
         }
-      } else {
-        LOGGER.error(s"Not able to acquire the compaction lock for table" +
-                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        CarbonException.analysisException(
-          "Table is already locked for compaction. Please try after some time.")
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in start compaction thread.", e)
+          lock.unlock()
+          throw e
+      } finally {
+        updateLock.unlock()
       }
     }
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 70a4350..709260e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -51,10 +51,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
-    if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "compaction", "data delete")
-    }
-
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
     }
@@ -77,10 +73,22 @@ private[sql] case class CarbonProjectForDeleteCommand(
     val metadataLock = CarbonLockFactory
       .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.METADATA_LOCK)
+    val compactionLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+    val updateLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.UPDATE_LOCK)
     var lockStatus = false
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
+        if (!compactionLock.lockWithRetries(3, 3)) {
+          throw new ConcurrentOperationException(carbonTable, "compaction", "delete")
+        }
+        if (!updateLock.lockWithRetries(3, 3)) {
+          throw new ConcurrentOperationException(carbonTable, "update/delete", "delete")
+        }
         LOGGER.info("Successfully able to get the table metadata file lock")
       } else {
         throw new Exception("Table is locked for deletion. Please try after some time")
@@ -134,6 +142,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
       if (lockStatus) {
         CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
       }
+      updateLock.unlock()
+      compactionLock.unlock()
     }
     Seq.empty
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index e4abae1..705ba4b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -79,9 +79,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
-    if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "compaction", "data update")
-    }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data update")
     }
@@ -99,6 +96,10 @@ private[sql] case class CarbonProjectForUpdateCommand(
     val metadataLock = CarbonLockFactory
       .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.METADATA_LOCK)
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable
+      .getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
+    val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.UPDATE_LOCK)
     var lockStatus = false
     // get the current time stamp which should be same for delete and update.
     val currentTime = CarbonUpdateUtil.readCurrentTime
@@ -113,45 +114,51 @@ private[sql] case class CarbonProjectForUpdateCommand(
       else {
         throw new Exception("Table is locked for updation. Please try after some time")
       }
-      // Get RDD.
 
-      dataSet = if (isPersistEnabled) {
-        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
-          CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
-      }
-      else {
-        Dataset.ofRows(sparkSession, plan)
-      }
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
-
-      // handle the clean up of IUD.
-      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
-      // do delete operation.
-      val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
-        databaseNameOp,
-        tableName,
-        sparkSession,
-        dataSet.rdd,
-        currentTime + "",
-        isUpdateOperation = true,
-        executionErrors)
-
-      if (executionErrors.failureCauses != FailureCauses.NONE) {
-        throw new Exception(executionErrors.errorMsg)
+      if (updateLock.lockWithRetries(3, 3)) {
+        if (compactionLock.lockWithRetries(3, 3)) {
+          // Get RDD.
+          dataSet = if (isPersistEnabled) {
+            Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
+              CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
+          }
+          else {
+            Dataset.ofRows(sparkSession, plan)
+          }
+
+          // handle the clean up of IUD.
+          CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+          // do delete operation.
+          val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
+            databaseNameOp,
+            tableName,
+            sparkSession,
+            dataSet.rdd,
+            currentTime + "",
+            isUpdateOperation = true,
+            executionErrors)
+
+          if (executionErrors.failureCauses != FailureCauses.NONE) {
+            throw new Exception(executionErrors.errorMsg)
+          }
+
+          // do update operation.
+          performUpdate(dataSet,
+            databaseNameOp,
+            tableName,
+            plan,
+            sparkSession,
+            currentTime,
+            executionErrors,
+            segmentsToBeDeleted)
+        } else {
+          throw new ConcurrentOperationException(carbonTable, "compaction", "update")
+        }
+      } else {
+        throw new ConcurrentOperationException(carbonTable, "update/delete", "update")
       }
-
-      // do update operation.
-      performUpdate(dataSet,
-        databaseNameOp,
-        tableName,
-        plan,
-        sparkSession,
-        currentTime,
-        executionErrors,
-        segmentsToBeDeleted)
-
       if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
       }
@@ -185,11 +192,12 @@ private[sql] case class CarbonProjectForUpdateCommand(
           sys.error("Update operation failed. " + e.getCause.getMessage)
         }
         sys.error("Update operation failed. please check logs.")
-    }
-    finally {
+    } finally {
       if (null != dataSet && isPersistEnabled) {
         dataSet.unpersist()
       }
+      updateLock.unlock()
+      compactionLock.unlock()
       if (lockStatus) {
         CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
       }