You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/07 11:49:13 UTC
[3/3] carbondata git commit: [CARBONDATA-1669] Clean up code in
CarbonDataRDDFactory
[CARBONDATA-1669] Clean up code in CarbonDataRDDFactory
This closes #1467
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0578ba0f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0578ba0f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0578ba0f
Branch: refs/heads/master
Commit: 0578ba0f2f9931a89f9759ea4be97975957280ae
Parents: 11661eb
Author: Jacky Li <ja...@qq.com>
Authored: Tue Nov 7 11:41:30 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Nov 7 19:47:17 2017 +0800
----------------------------------------------------------------------
.../carbondata/core/locks/CarbonLockUtil.java | 19 +-
.../org/apache/carbondata/api/CarbonStore.scala | 39 +-
.../spark/rdd/DataManagementFunc.scala | 231 +--
.../carbondata/spark/rdd/UpdateDataLoad.scala | 3 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 1444 ++++++++----------
.../spark/sql/CarbonCatalystOperators.scala | 1 -
.../AlterTableCompactionCommand.scala | 124 +-
.../AlterTableDropCarbonPartitionCommand.scala | 133 +-
.../AlterTableSplitCarbonPartitionCommand.scala | 156 +-
.../restructure/AlterTableRevertTestCase.scala | 2 +-
10 files changed, 1044 insertions(+), 1108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index 60a7564..c02a168 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -65,20 +65,27 @@ public class CarbonLockUtil {
/**
* Given a lock type this method will return a new lock object if not acquired by any other
* operation
- *
- * @param identifier
- * @param lockType
- * @return
*/
- public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) {
+ public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType,
+ String errorMsg) {
ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(identifier, lockType);
LOGGER.info("Trying to acquire lock: " + carbonLock);
if (carbonLock.lockWithRetries()) {
LOGGER.info("Successfully acquired the lock " + carbonLock);
} else {
- throw new RuntimeException("Table is locked for updation. Please try after some time");
+ LOGGER.error(errorMsg);
+ throw new RuntimeException(errorMsg);
}
return carbonLock;
}
+ /**
+ * Get and lock with default error message
+ */
+ public static ICarbonLock getLockObject(CarbonTableIdentifier identifier, String lockType) {
+ return getLockObject(identifier,
+ lockType,
+ "Acquire table lock failed after retry, please try after some time");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 4a66d0f..e77f5c3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -28,8 +28,11 @@ import org.apache.spark.sql.types.TimestampType
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.DataManagementFunc
@@ -79,16 +82,34 @@ object CarbonStore {
dbName: String,
tableName: String,
storePath: String,
- carbonTable: CarbonTable, forceTableClean: Boolean): Unit = {
+ carbonTable: CarbonTable,
+ forceTableClean: Boolean): Unit = {
LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
+ var carbonCleanFilesLock: ICarbonLock = null
+ val identifier = new CarbonTableIdentifier(dbName, tableName, "")
try {
- DataManagementFunc.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
- LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
+ val errorMsg = "Clean files request is failed for " +
+ s"$dbName.$tableName" +
+ ". Not able to acquire the clean files lock due to another clean files " +
+ "operation is running in the background."
+ carbonCleanFilesLock =
+ CarbonLockUtil.getLockObject(identifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+ if (forceTableClean) {
+ val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
+ FileFactory.deleteAllCarbonFilesOfDir(
+ FileFactory.getCarbonFile(absIdent.getTablePath,
+ FileFactory.getFileType(absIdent.getTablePath)))
+ } else {
+ DataManagementFunc.deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
+ isForceDeletion = true, carbonTable)
+ CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+ }
+ } finally {
+ if (carbonCleanFilesLock != null) {
+ CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+ }
}
- Seq.empty
+ LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
}
// validates load ids
@@ -163,7 +184,7 @@ object CarbonStore {
val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
SegmentStatusManager(
identifier).getValidAndInvalidSegments
- return validAndInvalidSegments.getValidSegments.contains(segmentId)
+ validAndInvalidSegments.getValidSegments.contains(segmentId)
}
private def validateTimeFormat(timestamp: String): Long = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index c2029e5..cbdb336 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -21,25 +21,20 @@ import java.util
import java.util.concurrent._
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, DropPartitionCallableModel, SplitPartitionCallableModel}
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, LoadMetadataUtil}
-import org.apache.carbondata.spark._
import org.apache.carbondata.spark.compaction.CompactionCallable
-import org.apache.carbondata.spark.partition.{DropPartitionCallable, SplitPartitionCallable}
import org.apache.carbondata.spark.util.CommonUtil
/**
@@ -49,105 +44,6 @@ object DataManagementFunc {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def deleteLoadByDate(
- sqlContext: SQLContext,
- schema: CarbonDataLoadSchema,
- databaseName: String,
- tableName: String,
- storePath: String,
- dateField: String,
- dateFieldActualName: String,
- dateValue: String) {
-
- val sc = sqlContext
- // Delete the records based on data
- val table = schema.getCarbonTable
- val loadMetadataDetailsArray =
- SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
- val resultMap = new CarbonDeleteLoadByDateRDD(
- sc.sparkContext,
- new DeletedLoadResultImpl(),
- databaseName,
- table.getDatabaseName,
- dateField,
- dateFieldActualName,
- dateValue,
- table.getFactTableName,
- tableName,
- storePath,
- loadMetadataDetailsArray).collect.groupBy(_._1)
-
- var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
- if (resultMap.nonEmpty) {
- if (resultMap.size == 1) {
- if (resultMap.contains("")) {
- LOGGER.error("Delete by Date request is failed")
- sys.error("Delete by Date request is failed, potential causes " +
- "Empty store or Invalid column type, For more details please refer logs.")
- }
- }
- val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
- var statusList = resultMap.get(elem.getLoadName)
- // check for the merged load folder.
- if (statusList.isEmpty && null != elem.getMergedLoadName) {
- statusList = resultMap.get(elem.getMergedLoadName)
- }
-
- if (statusList.isDefined) {
- elem.setModificationOrdeletionTimesStamp(elem.getTimeStamp(CarbonLoaderUtil
- .readCurrentTime()))
- // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
- // use MARKED_FOR_UPDATE
- if (statusList.get
- .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
- elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
- } else {
- elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
- updatedLoadMetadataDetailsList += elem
- }
- elem
- } else {
- elem
- }
- }
-
- }
-
- // Save the load metadata
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK
- )
- try {
- if (carbonLock.lockWithRetries()) {
- LOGGER.info("Successfully got the table metadata file lock")
- if (updatedLoadMetadataDetailsList.nonEmpty) {
- // TODO: Load Aggregate tables after retention.
- }
-
- // write
- CarbonLoaderUtil.writeLoadMetadata(
- storePath,
- databaseName,
- table.getDatabaseName,
- updatedloadMetadataDetails.asJava
- )
- }
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info("unlock the table metadata file successfully")
- } else {
- LOGGER.error("Unable to unlock the metadata lock")
- }
- }
- } else {
- LOGGER.error("Delete by Date request is failed")
- LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
- sys.error("Delete by Date request is failed, potential causes " +
- "Empty store or Invalid column type, For more details please refer logs.")
- }
- }
-
def executeCompaction(carbonLoadModel: CarbonLoadModel,
compactionModel: CompactionModel,
executor: ExecutorService,
@@ -226,8 +122,6 @@ object DataManagementFunc {
/**
* This will submit the loads to be merged into the executor.
- *
- * @param futureList
*/
private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
loadsToMerge: util.List[LoadMetadataDetails],
@@ -235,14 +129,14 @@ object DataManagementFunc {
sqlContext: SQLContext,
compactionModel: CompactionModel,
carbonLoadModel: CarbonLoadModel,
- storeLocation: String): Unit = {
-
- loadsToMerge.asScala.foreach(seg => {
+ storeLocation: String
+ ): Unit = {
+ loadsToMerge.asScala.foreach { seg =>
LOGGER.info("loads identified for merge is " + seg.getLoadName)
}
- )
- val compactionCallableModel = CompactionCallableModel(carbonLoadModel,
+ val compactionCallableModel = CompactionCallableModel(
+ carbonLoadModel,
storeLocation,
compactionModel.carbonTable,
loadsToMerge,
@@ -254,80 +148,6 @@ object DataManagementFunc {
futureList.add(future)
}
- def executePartitionSplit( sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- executor: ExecutorService,
- segment: String,
- partitionId: String,
- oldPartitionIdList: List[Int]): Unit = {
- val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
- )
- scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
- sqlContext, carbonLoadModel, oldPartitionIdList)
- try {
- futureList.asScala.foreach(future => {
- future.get
- }
- )
- } catch {
- case e: Exception =>
- LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
- throw e
- }
- }
-
- private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
- executor: ExecutorService,
- segmentId: String,
- partitionId: String,
- sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- oldPartitionIdList: List[Int]): Unit = {
-
- val splitModel = SplitPartitionCallableModel(carbonLoadModel,
- segmentId,
- partitionId,
- oldPartitionIdList,
- sqlContext)
-
- val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
- futureList.add(future)
- }
-
- def executeDroppingPartition(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- executor: ExecutorService,
- segmentId: String,
- partitionId: String,
- dropWithData: Boolean,
- oldPartitionIds: List[Int]): Unit = {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val model = new DropPartitionCallableModel(carbonLoadModel,
- segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
- val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
- try {
- future.get
- } catch {
- case e: Exception =>
- LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
- throw e
- }
- }
-
- def prepareCarbonLoadModel(table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = {
- newCarbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- newCarbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
- newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
- newCarbonLoadModel.setStorePath(table.getStorePath)
- CommonUtil.readLoadMetadataDetails(newCarbonLoadModel)
- val loadStartTime = CarbonUpdateUtil.readCurrentTime();
- newCarbonLoadModel.setFactTimeStamp(loadStartTime)
- }
-
def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
// Deleting the any partially loaded data if present.
// in some case the segment folder which is present in store will not have entry in
@@ -397,39 +217,4 @@ object DataManagementFunc {
}
}
- def cleanFiles(
- dbName: String,
- tableName: String,
- storePath: String,
- carbonTable: CarbonTable,
- forceTableClean: Boolean): Unit = {
- val identifier = new CarbonTableIdentifier(dbName, tableName, "")
- val carbonCleanFilesLock =
- CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK)
- try {
- if (carbonCleanFilesLock.lockWithRetries()) {
- LOGGER.info("Clean files lock has been successfully acquired.")
- if (forceTableClean) {
- val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
- FileFactory.deleteAllCarbonFilesOfDir(
- FileFactory.getCarbonFile(absIdent.getTablePath,
- FileFactory.getFileType(absIdent.getTablePath)))
- } else {
- deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
- isForceDeletion = true, carbonTable)
- CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
- }
- } else {
- val errorMsg = "Clean files request is failed for " +
- s"$dbName.$tableName" +
- ". Not able to acquire the clean files lock due to another clean files " +
- "operation is running in the background."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
- }
- } finally {
- CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
- }
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 4d782c9..eb07240 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -35,7 +35,8 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
*/
object UpdateDataLoad {
- def DataLoadForUpdate(segId: String,
+ def DataLoadForUpdate(
+ segId: String,
index: Int,
iter: Iterator[Row],
carbonLoadModel: CarbonLoadModel,