You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2017/12/13 12:42:19 UTC

carbondata git commit: [CARBONDATA-1876] Clean all the InProgress segments for all databases during session initialization

Repository: carbondata
Updated Branches:
  refs/heads/master 4158f3d96 -> 6a185b834


[CARBONDATA-1876] Clean all the InProgress segments for all databases during session initialization

Clean all the InProgress segments for all databases during session initialization. when carbon session initialize, clean all the in progress segments for all the databases created by user.

This closes #1637


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6a185b83
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6a185b83
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6a185b83

Branch: refs/heads/master
Commit: 6a185b8341f6f36f21b325c961a0a33ab17b15aa
Parents: 4158f3d
Author: akashrn5 <ak...@gmail.com>
Authored: Fri Dec 8 04:36:55 2017 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Dec 13 18:15:11 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  2 +-
 .../carbondata/spark/util/CommonUtil.scala      | 24 +++++++++-----------
 .../org/apache/spark/sql/CarbonSession.scala    |  7 ++++--
 3 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a185b83/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4ab9919..959f4d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1302,7 +1302,7 @@ public final class CarbonCommonConstants {
    */
   public static final String DATA_MANAGEMENT_DRIVER = "spark.carbon.datamanagement.driver";
 
-  public static final String DATA_MANAGEMENT_DRIVER_DEFAULT = "false";
+  public static final String DATA_MANAGEMENT_DRIVER_DEFAULT = "true";
 
   public static final String CARBON_SESSIONSTATE_CLASSNAME = "spark.carbon.sessionstate.classname";
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a185b83/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 3595884..943c0a5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -760,10 +760,10 @@ object CommonUtil {
   /**
    * The in-progress segments which are left when the driver is down will be marked as deleted
    * when driver is initializing.
-   * @param storePath
-   * @param sparkContext
+   * @param databaseLocation
+   * @param dbName
    */
-  def cleanInProgressSegments(storePath: String, sparkContext: SparkContext): Unit = {
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
     val loaderDriver = CarbonProperties.getInstance().
       getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
         CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
@@ -771,18 +771,17 @@ object CommonUtil {
       return
     }
     try {
-      val fileType = FileFactory.getFileType(storePath)
-      if (FileFactory.isFileExist(storePath, fileType)) {
-        val file = FileFactory.getCarbonFile(storePath, fileType)
-        val databaseFolders = file.listFiles()
-        databaseFolders.foreach { databaseFolder =>
-          if (databaseFolder.isDirectory) {
-            val tableFolders = databaseFolder.listFiles()
+      val fileType = FileFactory.getFileType(databaseLocation)
+      if (FileFactory.isFileExist(databaseLocation, fileType)) {
+        val file = FileFactory.getCarbonFile(databaseLocation, fileType)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
             tableFolders.foreach { tableFolder =>
               if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
                 val identifier =
-                  AbsoluteTableIdentifier.from(storePath,
-                    databaseFolder.getName, tableFolder.getName)
+                  AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName)
                 val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
                 val tableStatusFile = carbonTablePath.getTableStatusFilePath
                 if (FileFactory.isFileExist(tableStatusFile, fileType)) {
@@ -826,7 +825,6 @@ object CommonUtil {
               }
             }
           }
-        }
       }
     } catch {
       case s: java.io.FileNotFoundException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a185b83/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index e6ee535..c9b134e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -182,8 +182,11 @@ object CarbonSession {
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
         SparkSession.setDefaultSession(session)
         try {
-          CommonUtil.cleanInProgressSegments(
-            carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION), sparkContext)
+          val databases = session.sessionState.catalog.listDatabases()
+          databases.foreach(dbName => {
+            val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, session)
+            CommonUtil.cleanInProgressSegments(databaseLocation, dbName)
+          })
         } catch {
           case e: Throwable =>
             // catch all exceptions to avoid CarbonSession initialization failure