You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/07 11:49:13 UTC

[3/3] carbondata git commit: [CARBONDATA-1669] Clean up code in CarbonDataRDDFactory

[CARBONDATA-1669] Clean up code in CarbonDataRDDFactory

This closes #1467


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0578ba0f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0578ba0f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0578ba0f

Branch: refs/heads/master
Commit: 0578ba0f2f9931a89f9759ea4be97975957280ae
Parents: 11661eb
Author: Jacky Li <ja...@qq.com>
Authored: Tue Nov 7 11:41:30 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Nov 7 19:47:17 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/locks/CarbonLockUtil.java   |   19 +-
 .../org/apache/carbondata/api/CarbonStore.scala |   39 +-
 .../spark/rdd/DataManagementFunc.scala          |  231 +--
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |    3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 1444 ++++++++----------
 .../spark/sql/CarbonCatalystOperators.scala     |    1 -
 .../AlterTableCompactionCommand.scala           |  124 +-
 .../AlterTableDropCarbonPartitionCommand.scala  |  133 +-
 .../AlterTableSplitCarbonPartitionCommand.scala |  156 +-
 .../restructure/AlterTableRevertTestCase.scala  |    2 +-
 10 files changed, 1044 insertions(+), 1108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index 60a7564..c02a168 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -65,20 +65,27 @@ public class CarbonLockUtil {
   /**
    * Given a lock type this method will return a new lock object if not acquired by any other
    * operation
-   *
-   * @param identifier
-   * @param lockType
-   * @return
    */
-  public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) {
+  public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType,
+      String errorMsg) {
     ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(identifier, lockType);
     LOGGER.info("Trying to acquire lock: " + carbonLock);
     if (carbonLock.lockWithRetries()) {
       LOGGER.info("Successfully acquired the lock " + carbonLock);
     } else {
-      throw new RuntimeException("Table is locked for updation. Please try after some time");
+      LOGGER.error(errorMsg);
+      throw new RuntimeException(errorMsg);
     }
     return carbonLock;
   }
 
+  /**
+   * Get and lock with default error message
+   */
+  public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) {
+    return getLockObject(identifier,
+        lockType,
+        "Acquire table lock failed after retry, please try after some time");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 4a66d0f..e77f5c3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -28,8 +28,11 @@ import org.apache.spark.sql.types.TimestampType
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.DataManagementFunc
@@ -79,16 +82,34 @@ object CarbonStore {
       dbName: String,
       tableName: String,
       storePath: String,
-      carbonTable: CarbonTable, forceTableClean: Boolean): Unit = {
+      carbonTable: CarbonTable,
+      forceTableClean: Boolean): Unit = {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
+    var carbonCleanFilesLock: ICarbonLock = null
+    val identifier = new CarbonTableIdentifier(dbName, tableName, "")
     try {
-      DataManagementFunc.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
-      LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
-    } catch {
-      case ex: Exception =>
-        sys.error(ex.getMessage)
+      val errorMsg = "Clean files request is failed for " +
+                     s"$dbName.$tableName" +
+                     ". Not able to acquire the clean files lock due to another clean files " +
+                     "operation is running in the background."
+      carbonCleanFilesLock =
+        CarbonLockUtil.getLockObject(identifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      if (forceTableClean) {
+        val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absIdent.getTablePath,
+            FileFactory.getFileType(absIdent.getTablePath)))
+      } else {
+        DataManagementFunc.deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
+          isForceDeletion = true, carbonTable)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+      }
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
     }
-    Seq.empty
+    LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
   }
 
   // validates load ids
@@ -163,7 +184,7 @@ object CarbonStore {
     val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
         SegmentStatusManager(
           identifier).getValidAndInvalidSegments
-    return validAndInvalidSegments.getValidSegments.contains(segmentId)
+    validAndInvalidSegments.getValidSegments.contains(segmentId)
   }
 
   private def validateTimeFormat(timestamp: String): Long = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index c2029e5..cbdb336 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -21,25 +21,20 @@ import java.util
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, DropPartitionCallableModel, SplitPartitionCallableModel}
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, LoadMetadataUtil}
-import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.compaction.CompactionCallable
-import org.apache.carbondata.spark.partition.{DropPartitionCallable, SplitPartitionCallable}
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -49,105 +44,6 @@ object DataManagementFunc {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def deleteLoadByDate(
-      sqlContext: SQLContext,
-      schema: CarbonDataLoadSchema,
-      databaseName: String,
-      tableName: String,
-      storePath: String,
-      dateField: String,
-      dateFieldActualName: String,
-      dateValue: String) {
-
-    val sc = sqlContext
-    // Delete the records based on data
-    val table = schema.getCarbonTable
-    val loadMetadataDetailsArray =
-      SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
-    val resultMap = new CarbonDeleteLoadByDateRDD(
-      sc.sparkContext,
-      new DeletedLoadResultImpl(),
-      databaseName,
-      table.getDatabaseName,
-      dateField,
-      dateFieldActualName,
-      dateValue,
-      table.getFactTableName,
-      tableName,
-      storePath,
-      loadMetadataDetailsArray).collect.groupBy(_._1)
-
-    var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
-    if (resultMap.nonEmpty) {
-      if (resultMap.size == 1) {
-        if (resultMap.contains("")) {
-          LOGGER.error("Delete by Date request is failed")
-          sys.error("Delete by Date request is failed, potential causes " +
-              "Empty store or Invalid column type, For more details please refer logs.")
-        }
-      }
-      val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
-        var statusList = resultMap.get(elem.getLoadName)
-        // check for the merged load folder.
-        if (statusList.isEmpty && null != elem.getMergedLoadName) {
-          statusList = resultMap.get(elem.getMergedLoadName)
-        }
-
-        if (statusList.isDefined) {
-          elem.setModificationOrdeletionTimesStamp(elem.getTimeStamp(CarbonLoaderUtil
-            .readCurrentTime()))
-          // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
-          // use MARKED_FOR_UPDATE
-          if (statusList.get
-              .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
-            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
-          } else {
-            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
-            updatedLoadMetadataDetailsList += elem
-          }
-          elem
-        } else {
-          elem
-        }
-      }
-
-      }
-
-      // Save the load metadata
-      val carbonLock = CarbonLockFactory
-          .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-            LockUsage.METADATA_LOCK
-          )
-      try {
-        if (carbonLock.lockWithRetries()) {
-          LOGGER.info("Successfully got the table metadata file lock")
-          if (updatedLoadMetadataDetailsList.nonEmpty) {
-            // TODO: Load Aggregate tables after retention.
-          }
-
-          // write
-          CarbonLoaderUtil.writeLoadMetadata(
-            storePath,
-            databaseName,
-            table.getDatabaseName,
-            updatedloadMetadataDetails.asJava
-          )
-        }
-      } finally {
-        if (carbonLock.unlock()) {
-          LOGGER.info("unlock the table metadata file successfully")
-        } else {
-          LOGGER.error("Unable to unlock the metadata lock")
-        }
-      }
-    } else {
-      LOGGER.error("Delete by Date request is failed")
-      LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
-      sys.error("Delete by Date request is failed, potential causes " +
-          "Empty store or Invalid column type, For more details please refer logs.")
-    }
-  }
-
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
       compactionModel: CompactionModel,
       executor: ExecutorService,
@@ -226,8 +122,6 @@ object DataManagementFunc {
 
   /**
    * This will submit the loads to be merged into the executor.
-   *
-   * @param futureList
    */
   private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
       loadsToMerge: util.List[LoadMetadataDetails],
@@ -235,14 +129,14 @@ object DataManagementFunc {
       sqlContext: SQLContext,
       compactionModel: CompactionModel,
       carbonLoadModel: CarbonLoadModel,
-      storeLocation: String): Unit = {
-
-    loadsToMerge.asScala.foreach(seg => {
+      storeLocation: String
+  ): Unit = {
+    loadsToMerge.asScala.foreach { seg =>
       LOGGER.info("loads identified for merge is " + seg.getLoadName)
     }
-    )
 
-    val compactionCallableModel = CompactionCallableModel(carbonLoadModel,
+    val compactionCallableModel = CompactionCallableModel(
+      carbonLoadModel,
       storeLocation,
       compactionModel.carbonTable,
       loadsToMerge,
@@ -254,80 +148,6 @@ object DataManagementFunc {
     futureList.add(future)
   }
 
-  def executePartitionSplit( sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      executor: ExecutorService,
-      segment: String,
-      partitionId: String,
-      oldPartitionIdList: List[Int]): Unit = {
-    val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-      CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
-    )
-    scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
-      sqlContext, carbonLoadModel, oldPartitionIdList)
-    try {
-        futureList.asScala.foreach(future => {
-          future.get
-        }
-      )
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
-        throw e
-    }
-  }
-
-  private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
-      executor: ExecutorService,
-      segmentId: String,
-      partitionId: String,
-      sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      oldPartitionIdList: List[Int]): Unit = {
-
-    val splitModel = SplitPartitionCallableModel(carbonLoadModel,
-      segmentId,
-      partitionId,
-      oldPartitionIdList,
-      sqlContext)
-
-    val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
-    futureList.add(future)
-  }
-
-  def executeDroppingPartition(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      executor: ExecutorService,
-      segmentId: String,
-      partitionId: String,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val model = new DropPartitionCallableModel(carbonLoadModel,
-      segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
-    val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
-    try {
-        future.get
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
-        throw e
-    }
-  }
-
-  def prepareCarbonLoadModel(table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = {
-    newCarbonLoadModel.setTableName(table.getFactTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
-    newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
-    newCarbonLoadModel.setStorePath(table.getStorePath)
-    CommonUtil.readLoadMetadataDetails(newCarbonLoadModel)
-    val loadStartTime = CarbonUpdateUtil.readCurrentTime();
-    newCarbonLoadModel.setFactTimeStamp(loadStartTime)
-  }
-
   def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
     // Deleting the any partially loaded data if present.
     // in some case the segment folder which is present in store will not have entry in
@@ -397,39 +217,4 @@ object DataManagementFunc {
     }
   }
 
-  def cleanFiles(
-      dbName: String,
-      tableName: String,
-      storePath: String,
-      carbonTable: CarbonTable,
-      forceTableClean: Boolean): Unit = {
-    val identifier = new CarbonTableIdentifier(dbName, tableName, "")
-    val carbonCleanFilesLock =
-      CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK)
-    try {
-      if (carbonCleanFilesLock.lockWithRetries()) {
-        LOGGER.info("Clean files lock has been successfully acquired.")
-        if (forceTableClean) {
-          val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
-          FileFactory.deleteAllCarbonFilesOfDir(
-            FileFactory.getCarbonFile(absIdent.getTablePath,
-            FileFactory.getFileType(absIdent.getTablePath)))
-        } else {
-          deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
-            isForceDeletion = true, carbonTable)
-          CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
-        }
-      } else {
-        val errorMsg = "Clean files request is failed for " +
-            s"$dbName.$tableName" +
-            ". Not able to acquire the clean files lock due to another clean files " +
-            "operation is running in the background."
-        LOGGER.audit(errorMsg)
-        LOGGER.error(errorMsg)
-        throw new Exception(errorMsg + " Please try after some time.")
-      }
-    } finally {
-      CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 4d782c9..eb07240 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -35,7 +35,8 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
  */
 object UpdateDataLoad {
 
-  def DataLoadForUpdate(segId: String,
+  def DataLoadForUpdate(
+      segId: String,
       index: Int,
       iter: Iterator[Row],
       carbonLoadModel: CarbonLoadModel,