You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:23 UTC
[05/50] [abbrv] carbondata git commit: [CARBONDATA-2049]
CarbonCleanFilesCommand table path problem
[CARBONDATA-2049] CarbonCleanFilesCommand table path problem
Problem:
In CarbonCleanFilesCommand datbaseLocation is being passed instead of the tablePath in case of forceclean.
And in case of cleanGarbageData, storeLocation is being passed instead of the tablePath.
This closes #1828
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9b479617
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9b479617
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9b479617
Branch: refs/heads/fgdatamap
Commit: 9b4796177610e3a4f9d426169753a40eceb7b675
Parents: d509f17
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Jan 16 11:49:54 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Jan 31 09:37:39 2018 +0800
----------------------------------------------------------------------
.../org/apache/carbondata/api/CarbonStore.scala | 32 ++++++++++++++------
.../management/CarbonCleanFilesCommand.scala | 25 +++++++--------
.../org/apache/spark/util/CleanFiles.scala | 32 ++++++++++++++------
.../apache/spark/util/CarbonCommandSuite.scala | 3 +-
4 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index d514f77..c02ba0a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -98,34 +98,46 @@ object CarbonStore {
}
}
+ /**
+ * The method deletes all data if forceTableCLean <true> and lean garbage segment
+ * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+ *
+ * @param dbName : Database name
+ * @param tableName : Table name
+ * @param tablePath : Table path
+ * @param carbonTable : CarbonTable Object <null> in case of force clean
+ * @param forceTableClean : <true> for force clean it will delete all data
+ * <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+ * @param currentTablePartitions : Hive Partitions details
+ */
def cleanFiles(
dbName: String,
tableName: String,
- storePath: String,
+ tablePath: String,
carbonTable: CarbonTable,
forceTableClean: Boolean,
currentTablePartitions: Option[Seq[String]] = None): Unit = {
LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
var carbonCleanFilesLock: ICarbonLock = null
- var absoluteTableIdentifier: AbsoluteTableIdentifier = null
- if (forceTableClean) {
- absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
+ val absoluteTableIdentifier = if (forceTableClean) {
+ AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
} else {
- absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ carbonTable.getAbsoluteTableIdentifier
}
try {
val errorMsg = "Clean files request is failed for " +
s"$dbName.$tableName" +
". Not able to acquire the clean files lock due to another clean files " +
"operation is running in the background."
- carbonCleanFilesLock =
- CarbonLockUtil.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+ // in case of force clean the lock is not required
if (forceTableClean) {
- val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
FileFactory.deleteAllCarbonFilesOfDir(
- FileFactory.getCarbonFile(absIdent.getTablePath,
- FileFactory.getFileType(absIdent.getTablePath)))
+ FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath,
+ FileFactory.getFileType(absoluteTableIdentifier.getTablePath)))
} else {
+ carbonCleanFilesLock =
+ CarbonLockUtil
+ .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
DataLoadingUtil.deleteLoadsAndUpdateMetadata(
isForceDeletion = true, carbonTable)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 303c3ef..4b68bd0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.util.CommonUtil
@@ -70,12 +70,13 @@ case class CarbonCleanFilesCommand(
databaseNameOp: Option[String], tableName: String): Unit = {
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+ val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
CarbonStore.cleanFiles(
- dbName,
- tableName,
- databaseLocation,
- null,
- forceTableClean)
+ dbName = dbName,
+ tableName = tableName,
+ tablePath = tablePath,
+ carbonTable = null, // in case of delete all data carbonTable is not required.
+ forceTableClean = forceTableClean)
}
private def cleanGarbageData(sparkSession: SparkSession,
@@ -90,12 +91,12 @@ case class CarbonCleanFilesCommand(
None
}
CarbonStore.cleanFiles(
- CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
- tableName,
- CarbonProperties.getStorePath,
- carbonTable,
- forceTableClean,
- partitions)
+ dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
+ tableName = tableName,
+ tablePath = carbonTable.getTablePath,
+ carbonTable = carbonTable,
+ forceTableClean = forceTableClean,
+ currentTablePartitions = partitions)
}
// Clean garbage data in all tables in all databases
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index eba7dcd..d4d9a84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -29,19 +29,30 @@ import org.apache.carbondata.api.CarbonStore
object CleanFiles {
/**
- * Clean the stale segments from table
- * @param spark
- * @param dbName
- * @param tableName
- * @param storePath
- * @param forceTableClean if true, it deletes the table and its contents with force.It does not
+ * The method deletes all data if forceTableCLean <true> and lean garbage segment
+ * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+ *
+ * @param spark : Database name
+ * @param dbName : Table name
+ * @param tableName : Table path
+ * @param forceTableClean : if true, it deletes the table and its contents with force.It does not
* drop table from hive metastore so should be very careful to use it.
*/
def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
- storePath: String, forceTableClean: Boolean = false): Unit = {
+ forceTableClean: Boolean = false): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
- val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
- CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean)
+ val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark)
+ val carbonTable = if (!forceTableClean) {
+ CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
+ } else {
+ null
+ }
+ CarbonStore.cleanFiles(
+ dbName = dbName,
+ tableName = tableName,
+ tablePath = tablePath,
+ carbonTable = carbonTable,
+ forceTableClean = forceTableClean)
}
def main(args: Array[String]): Unit = {
@@ -60,6 +71,7 @@ object CleanFiles {
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
- cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
+
+ cleanFiles(spark, dbName, tableName, forceTableClean)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index e493179..8ff6cab 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -142,7 +142,8 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll {
dropTable(table)
createAndLoadTestTable(table, "csv_table")
CleanFiles.main(Array(s"${location}", table, "true"))
- val tablePath = s"${location}${File.separator}default${File.separator}$table"
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
+ val tablePath = carbonTable.getTablePath
val f = new File(tablePath)
assert(!f.exists())