You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:23 UTC

[05/50] [abbrv] carbondata git commit: [CARBONDATA-2049] CarbonCleanFilesCommand table path problem

[CARBONDATA-2049] CarbonCleanFilesCommand table path problem

Problem:
In CarbonCleanFilesCommand datbaseLocation is being passed instead of the tablePath in case of forceclean.
And in case of cleanGarbageData, storeLocation is being passed instead of the tablePath.

This closes #1828


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9b479617
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9b479617
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9b479617

Branch: refs/heads/fgdatamap
Commit: 9b4796177610e3a4f9d426169753a40eceb7b675
Parents: d509f17
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Jan 16 11:49:54 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Jan 31 09:37:39 2018 +0800

----------------------------------------------------------------------
 .../org/apache/carbondata/api/CarbonStore.scala | 32 ++++++++++++++------
 .../management/CarbonCleanFilesCommand.scala    | 25 +++++++--------
 .../org/apache/spark/util/CleanFiles.scala      | 32 ++++++++++++++------
 .../apache/spark/util/CarbonCommandSuite.scala  |  3 +-
 4 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index d514f77..c02ba0a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -98,34 +98,46 @@ object CarbonStore {
     }
   }
 
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName          : Database name
+   * @param tableName       : Table name
+   * @param tablePath       : Table path
+   * @param carbonTable     : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean : <true> for force clean it will delete all data
+   *                        <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
   def cleanFiles(
       dbName: String,
       tableName: String,
-      storePath: String,
+      tablePath: String,
       carbonTable: CarbonTable,
       forceTableClean: Boolean,
       currentTablePartitions: Option[Seq[String]] = None): Unit = {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     var carbonCleanFilesLock: ICarbonLock = null
-    var absoluteTableIdentifier: AbsoluteTableIdentifier = null
-    if (forceTableClean) {
-      absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
     } else {
-      absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      carbonTable.getAbsoluteTableIdentifier
     }
     try {
       val errorMsg = "Clean files request is failed for " +
                      s"$dbName.$tableName" +
                      ". Not able to acquire the clean files lock due to another clean files " +
                      "operation is running in the background."
-      carbonCleanFilesLock =
-        CarbonLockUtil.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // in case of force clean the lock is not required
       if (forceTableClean) {
-        val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
         FileFactory.deleteAllCarbonFilesOfDir(
-          FileFactory.getCarbonFile(absIdent.getTablePath,
-            FileFactory.getFileType(absIdent.getTablePath)))
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath,
+            FileFactory.getFileType(absoluteTableIdentifier.getTablePath)))
       } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
         DataLoadingUtil.deleteLoadsAndUpdateMetadata(
           isForceDeletion = true, carbonTable)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 303c3ef..4b68bd0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -70,12 +70,13 @@ case class CarbonCleanFilesCommand(
       databaseNameOp: Option[String], tableName: String): Unit = {
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
     CarbonStore.cleanFiles(
-      dbName,
-      tableName,
-      databaseLocation,
-      null,
-      forceTableClean)
+      dbName = dbName,
+      tableName = tableName,
+      tablePath = tablePath,
+      carbonTable = null, // in case of delete all data carbonTable is not required.
+      forceTableClean = forceTableClean)
   }
 
   private def cleanGarbageData(sparkSession: SparkSession,
@@ -90,12 +91,12 @@ case class CarbonCleanFilesCommand(
       None
     }
     CarbonStore.cleanFiles(
-      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
-      tableName,
-      CarbonProperties.getStorePath,
-      carbonTable,
-      forceTableClean,
-      partitions)
+      dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
+      tableName = tableName,
+      tablePath = carbonTable.getTablePath,
+      carbonTable = carbonTable,
+      forceTableClean = forceTableClean,
+      currentTablePartitions = partitions)
   }
 
   // Clean garbage data in all tables in all databases

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index eba7dcd..d4d9a84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -29,19 +29,30 @@ import org.apache.carbondata.api.CarbonStore
 object CleanFiles {
 
   /**
-   * Clean the stale segments from table
-   * @param spark
-   * @param dbName
-   * @param tableName
-   * @param storePath
-   * @param forceTableClean if true, it deletes the table and its contents with force.It does not
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param spark           : Database name
+   * @param dbName          : Table name
+   * @param tableName       : Table path
+   * @param forceTableClean : if true, it deletes the table and its contents with force.It does not
    *                        drop table from hive metastore so should be very careful to use it.
    */
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
-      storePath: String, forceTableClean: Boolean = false): Unit = {
+     forceTableClean: Boolean = false): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
-    CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
+    val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark)
+    val carbonTable = if (!forceTableClean) {
+      CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
+    } else {
+      null
+    }
+    CarbonStore.cleanFiles(
+      dbName = dbName,
+      tableName = tableName,
+      tablePath = tablePath,
+      carbonTable = carbonTable,
+      forceTableClean = forceTableClean)
   }
 
   def main(args: Array[String]): Unit = {
@@ -60,6 +71,7 @@ object CleanFiles {
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
       checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
-    cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
+
+    cleanFiles(spark, dbName, tableName, forceTableClean)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index e493179..8ff6cab 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -142,7 +142,8 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
     dropTable(table)
     createAndLoadTestTable(table, "csv_table")
     CleanFiles.main(Array(s"${location}", table, "true"))
-    val tablePath = s"${location}${File.separator}default${File.separator}$table"
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
+    val tablePath = carbonTable.getTablePath
     val f = new File(tablePath)
     assert(!f.exists())