You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:52 UTC
[18/24] carbondata git commit: [CARBONDATA-1669] Clean up code in
CarbonDataRDDFactory
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index e0b891a..9b16060 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -18,32 +18,37 @@
package org.apache.spark.sql.execution.command.partition
import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, DropPartitionCallableModel, RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.DropPartitionCallable
case class AlterTableDropCarbonPartitionCommand(
model: AlterTableDropPartitionModel)
extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
- val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
override def run(sparkSession: SparkSession): Seq[Row] = {
if (model.partitionId.equals("0")) {
@@ -55,7 +60,6 @@ case class AlterTableDropCarbonPartitionCommand(
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
val tableName = model.tableName
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -151,7 +155,7 @@ case class AlterTableDropCarbonPartitionCommand(
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
- CarbonDataRDDFactory.alterTableDropPartition(
+ alterTableDropPartition(
sparkSession.sqlContext,
model.partitionId,
carbonLoadModel,
@@ -173,4 +177,111 @@ case class AlterTableDropCarbonPartitionCommand(
LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
Seq.empty
}
+
+ private def alterTableDropPartition(sqlContext: SQLContext,
+ partitionId: String,
+ carbonLoadModel: CarbonLoadModel,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ LOGGER.audit(s"Drop partition request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ try {
+ startDropThreads(
+ sqlContext,
+ carbonLoadModel,
+ partitionId,
+ dropWithData,
+ oldPartitionIds)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ private def startDropThreads(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ val numberOfCores = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+ CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+ val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+ try {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+ val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+ val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
+ var i = 0
+ for (segmentId: String <- validSegments) {
+ threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, dropWithData, oldPartitionIds)
+ threadArray(i).start()
+ i += 1
+ }
+ for (thread <- threadArray) {
+ thread.join()
+ }
+ val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+ carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ refresher.refreshSegments(validSegments.asJava)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
+ } finally {
+ executor.shutdown()
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
+ s" ${ e.getMessage }")
+ }
+ }
+ }
+}
+
+case class dropPartitionThread(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]) extends Thread {
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def run(): Unit = {
+ try {
+ executeDroppingPartition(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, dropWithData, oldPartitionIds)
+ } catch {
+ case e: Exception =>
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+ }
+ }
+
+ private def executeDroppingPartition(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val model = new DropPartitionCallableModel(carbonLoadModel,
+ segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
+ val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
+ try {
+ future.get
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+ throw e
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 12bf31e..c3a918c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -19,29 +19,32 @@ package org.apache.spark.sql.execution.command.partition
import java.text.SimpleDateFormat
import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand, SplitPartitionCallableModel}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.SplitPartitionCallable
/**
* Command for Alter Table Add & Split partition
@@ -51,7 +54,8 @@ case class AlterTableSplitCarbonPartitionCommand(
splitPartitionModel: AlterTableSplitPartitionModel)
extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
- val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
// TODO will add rollback function in case of process data failure
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -60,7 +64,6 @@ case class AlterTableSplitCarbonPartitionCommand(
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val tableName = splitPartitionModel.tableName
@@ -114,8 +117,7 @@ case class AlterTableSplitCarbonPartitionCommand(
Seq.empty
}
- private def updatePartitionInfo(partitionInfo: PartitionInfo,
- partitionIds: List[Int]) = {
+ private def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIds: List[Int]): Unit = {
val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
@@ -161,7 +163,7 @@ case class AlterTableSplitCarbonPartitionCommand(
carbonLoadModel.setStorePath(storePath)
val loadStartTime = CarbonUpdateUtil.readCurrentTime
carbonLoadModel.setFactTimeStamp(loadStartTime)
- CarbonDataRDDFactory.alterTableSplitPartition(
+ alterTableSplitPartition(
sparkSession.sqlContext,
splitPartitionModel.partitionId.toInt.toString,
carbonLoadModel,
@@ -185,4 +187,136 @@ case class AlterTableSplitCarbonPartitionCommand(
}
Seq.empty
}
+
+ private def alterTableSplitPartition(
+ sqlContext: SQLContext,
+ partitionId: String,
+ carbonLoadModel: CarbonLoadModel,
+ oldPartitionIdList: List[Int]
+ ): Unit = {
+ LOGGER.audit(s"Add partition request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ try {
+ startSplitThreads(sqlContext,
+ carbonLoadModel,
+ partitionId,
+ oldPartitionIdList)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ private def startSplitThreads(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ partitionId: String,
+ oldPartitionIdList: List[Int]): Unit = {
+ val numberOfCores = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+ CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+ val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+ try {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+ val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+ val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
+ var i = 0
+ validSegments.foreach { segmentId =>
+ threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, oldPartitionIdList)
+ threadArray(i).start()
+ i += 1
+ }
+ threadArray.foreach {
+ thread => thread.join()
+ }
+ val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+ carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ refresher.refreshSegments(validSegments.asJava)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
+ throw e
+ } finally {
+ executor.shutdown()
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
+ s" ${ e.getMessage }")
+ }
+ }
+ }
+}
+
+case class SplitThread(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ oldPartitionIdList: List[Int]) extends Thread {
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def run(): Unit = {
+ var triggeredSplitPartitionStatus = false
+ var exception: Exception = null
+ try {
+ executePartitionSplit(sqlContext,
+ carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
+ triggeredSplitPartitionStatus = true
+ } catch {
+ case e: Exception =>
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
+ exception = e
+ }
+ if (!triggeredSplitPartitionStatus) {
+ throw new Exception("Exception in split partition " + exception.getMessage)
+ }
+ }
+
+ private def executePartitionSplit( sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segment: String,
+ partitionId: String,
+ oldPartitionIdList: List[Int]): Unit = {
+ val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
+ )
+ scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
+ sqlContext, carbonLoadModel, oldPartitionIdList)
+ try {
+ futureList.asScala.foreach { future =>
+ future.get
+ }
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ oldPartitionIdList: List[Int]): Unit = {
+
+ val splitModel = SplitPartitionCallableModel(carbonLoadModel,
+ segmentId,
+ partitionId,
+ oldPartitionIdList,
+ sqlContext)
+
+ val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
+ futureList.add(future)
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 3fcad74..29de05b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -104,7 +104,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
sql("alter table reverttest rename to revert")
}
AlterTableUtil.releaseLocks(locks)
- assert(exception.getMessage == "Alter table rename table operation failed: Table is locked for updation. Please try after some time")
+ assert(exception.getMessage == "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time")
}
override def afterAll() {