You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:52 UTC

[18/24] carbondata git commit: [CARBONDATA-1669] Clean up code in CarbonDataRDDFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index e0b891a..9b16060 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -18,32 +18,37 @@
 package org.apache.spark.sql.execution.command.partition
 
 import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, DropPartitionCallableModel, RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.DropPartitionCallable
 
 case class AlterTableDropCarbonPartitionCommand(
     model: AlterTableDropPartitionModel)
   extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
-  val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     if (model.partitionId.equals("0")) {
@@ -55,7 +60,6 @@ case class AlterTableDropCarbonPartitionCommand(
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
     val tableName = model.tableName
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -151,7 +155,7 @@ case class AlterTableDropCarbonPartitionCommand(
       carbonLoadModel.setStorePath(relation.tableMeta.storePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
-      CarbonDataRDDFactory.alterTableDropPartition(
+      alterTableDropPartition(
         sparkSession.sqlContext,
         model.partitionId,
         carbonLoadModel,
@@ -173,4 +177,111 @@ case class AlterTableDropCarbonPartitionCommand(
     LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
     Seq.empty
   }
+
+  private def alterTableDropPartition(sqlContext: SQLContext,
+      partitionId: String,
+      carbonLoadModel: CarbonLoadModel,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    LOGGER.audit(s"Drop partition request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    try {
+      startDropThreads(
+        sqlContext,
+        carbonLoadModel,
+        partitionId,
+        dropWithData,
+        oldPartitionIds)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
+        throw e
+    }
+  }
+
+  private def startDropThreads(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    val numberOfCores = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
+      var i = 0
+      for (segmentId: String <- validSegments) {
+        threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
+          segmentId, partitionId, dropWithData, oldPartitionIds)
+        threadArray(i).start()
+        i += 1
+      }
+      for (thread <- threadArray) {
+        thread.join()
+      }
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      refresher.refreshSegments(validSegments.asJava)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
+    } finally {
+      executor.shutdown()
+      try {
+        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
+                       s" ${ e.getMessage }")
+      }
+    }
+  }
+}
+
+case class dropPartitionThread(sqlContext: SQLContext,
+    carbonLoadModel: CarbonLoadModel,
+    executor: ExecutorService,
+    segmentId: String,
+    partitionId: String,
+    dropWithData: Boolean,
+    oldPartitionIds: List[Int]) extends Thread {
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(): Unit = {
+    try {
+      executeDroppingPartition(sqlContext, carbonLoadModel, executor,
+        segmentId, partitionId, dropWithData, oldPartitionIds)
+    } catch {
+      case e: Exception =>
+        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+        LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+    }
+  }
+
+  private def executeDroppingPartition(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      segmentId: String,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val model = new DropPartitionCallableModel(carbonLoadModel,
+      segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
+    val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
+    try {
+      future.get
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+        throw e
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 12bf31e..c3a918c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -19,29 +19,32 @@ package org.apache.spark.sql.execution.command.partition
 
 import java.text.SimpleDateFormat
 import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand, SplitPartitionCallableModel}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.SplitPartitionCallable
 
 /**
  * Command for Alter Table Add & Split partition
@@ -51,7 +54,8 @@ case class AlterTableSplitCarbonPartitionCommand(
     splitPartitionModel: AlterTableSplitPartitionModel)
   extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
 
-  val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
 
   // TODO will add rollback function in case of process data failure
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -60,7 +64,6 @@ case class AlterTableSplitCarbonPartitionCommand(
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val tableName = splitPartitionModel.tableName
@@ -114,8 +117,7 @@ case class AlterTableSplitCarbonPartitionCommand(
     Seq.empty
   }
 
-  private def updatePartitionInfo(partitionInfo: PartitionInfo,
-      partitionIds: List[Int]) = {
+  private def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIds: List[Int]): Unit = {
     val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
@@ -161,7 +163,7 @@ case class AlterTableSplitCarbonPartitionCommand(
       carbonLoadModel.setStorePath(storePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
-      CarbonDataRDDFactory.alterTableSplitPartition(
+      alterTableSplitPartition(
         sparkSession.sqlContext,
         splitPartitionModel.partitionId.toInt.toString,
         carbonLoadModel,
@@ -185,4 +187,136 @@ case class AlterTableSplitCarbonPartitionCommand(
     }
     Seq.empty
   }
+
+  private def alterTableSplitPartition(
+      sqlContext: SQLContext,
+      partitionId: String,
+      carbonLoadModel: CarbonLoadModel,
+      oldPartitionIdList: List[Int]
+  ): Unit = {
+    LOGGER.audit(s"Add partition request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    try {
+      startSplitThreads(sqlContext,
+        carbonLoadModel,
+        partitionId,
+        oldPartitionIdList)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
+        throw e
+    }
+  }
+
+  private def startSplitThreads(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      partitionId: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    val numberOfCores = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
+      var i = 0
+      validSegments.foreach { segmentId =>
+        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
+          segmentId, partitionId, oldPartitionIdList)
+        threadArray(i).start()
+        i += 1
+      }
+      threadArray.foreach {
+        thread => thread.join()
+      }
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      refresher.refreshSegments(validSegments.asJava)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
+        throw e
+    } finally {
+      executor.shutdown()
+      try {
+        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
+                       s" ${ e.getMessage }")
+      }
+    }
+  }
+}
+
+case class SplitThread(sqlContext: SQLContext,
+    carbonLoadModel: CarbonLoadModel,
+    executor: ExecutorService,
+    segmentId: String,
+    partitionId: String,
+    oldPartitionIdList: List[Int]) extends Thread {
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(): Unit = {
+    var triggeredSplitPartitionStatus = false
+    var exception: Exception = null
+    try {
+      executePartitionSplit(sqlContext,
+        carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
+      triggeredSplitPartitionStatus = true
+    } catch {
+      case e: Exception =>
+        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+        LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
+        exception = e
+    }
+    if (!triggeredSplitPartitionStatus) {
+      throw new Exception("Exception in split partition " + exception.getMessage)
+    }
+  }
+
+  private def executePartitionSplit( sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      segment: String,
+      partitionId: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+      CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
+    )
+    scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
+      sqlContext, carbonLoadModel, oldPartitionIdList)
+    try {
+      futureList.asScala.foreach { future =>
+        future.get
+      }
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
+        throw e
+    }
+  }
+
+  private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
+      executor: ExecutorService,
+      segmentId: String,
+      partitionId: String,
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      oldPartitionIdList: List[Int]): Unit = {
+
+    val splitModel = SplitPartitionCallableModel(carbonLoadModel,
+      segmentId,
+      partitionId,
+      oldPartitionIdList,
+      sqlContext)
+
+    val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
+    futureList.add(future)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 3fcad74..29de05b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -104,7 +104,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       sql("alter table reverttest rename to revert")
     }
     AlterTableUtil.releaseLocks(locks)
-    assert(exception.getMessage == "Alter table rename table operation failed: Table is locked for updation. Please try after some time")
+    assert(exception.getMessage == "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time")
   }
 
   override def afterAll() {