You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2017/12/13 04:40:26 UTC

carbondata git commit: [CARBONDATA-1886] Check and Delete stale segment folders on new load

Repository: carbondata
Updated Branches:
  refs/heads/master 970a32a26 -> 5e3aec43e


[CARBONDATA-1886] Check and Delete stale segment folders on new load

segment folders are not getting deleted if corresponding entry is not available in table status file. Due to this query gives more record count than actual

This closes #1646


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5e3aec43
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5e3aec43
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5e3aec43

Branch: refs/heads/master
Commit: 5e3aec43ed65f48a63a396aac016e1ac06ef8589
Parents: 970a32a
Author: kunal642 <ku...@gmail.com>
Authored: Tue Dec 12 16:25:31 2017 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Dec 13 10:13:03 2017 +0530

----------------------------------------------------------------------
 .../spark/testsuite/dataload/TestLoadDataGeneral.scala | 13 ++++++++++++-
 .../command/management/CarbonLoadDataCommand.scala     |  9 ++++++---
 .../processing/loading/TableProcessingOperations.java  |  3 +--
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index e3d497a..49f3c5e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
@@ -191,6 +190,18 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("test if stale folders are deleting on data load") {
+    sql("drop table if exists stale")
+    sql("create table stale(a string) stored by 'carbondata'")
+    sql("insert into stale values('k')")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "stale")
+    val tableStatusFile = new CarbonTablePath(null,
+      carbonTable.getTablePath).getTableStatusFilePath
+    FileFactory.getCarbonFile(tableStatusFile).delete()
+    sql("insert into stale values('k')")
+    checkAnswer(sql("select * from stale"), Row("k"))
+  }
+
   override def afterAll {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index ebdaa33..9f6fce1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CausedBy, FileUtils}
 
@@ -42,9 +42,9 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -139,7 +139,10 @@ case class CarbonLoadDataCommand(
         carbonLoadModel,
         hadoopConf
       )
-
+      // Delete stale segment folders that are not in table status but are physically present in
+      // the Fact folder
+      LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+      TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
       try {
         val operationContext = new OperationContext
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index cb53d6e..e2be79c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -74,8 +74,7 @@ public class TableProcessingOperations {
                 CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
             boolean found = false;
             for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
-                  .equals(partitionCount)) {
+              if (details[j].getLoadName().equals(segmentId)) {
                 found = true;
                 break;
               }