You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2017/12/13 04:40:26 UTC
carbondata git commit: [CARBONDATA-1886] Check and Delete stale
segment folders on new load
Repository: carbondata
Updated Branches:
refs/heads/master 970a32a26 -> 5e3aec43e
[CARBONDATA-1886] Check and Delete stale segment folders on new load
segment folders are not getting deleted if corresponding entry is not available in table status file. Due to this query gives more record count than actual
This closes #1646
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5e3aec43
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5e3aec43
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5e3aec43
Branch: refs/heads/master
Commit: 5e3aec43ed65f48a63a396aac016e1ac06ef8589
Parents: 970a32a
Author: kunal642 <ku...@gmail.com>
Authored: Tue Dec 12 16:25:31 2017 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Dec 13 10:13:03 2017 +0530
----------------------------------------------------------------------
.../spark/testsuite/dataload/TestLoadDataGeneral.scala | 13 ++++++++++++-
.../command/management/CarbonLoadDataCommand.scala | 9 ++++++---
.../processing/loading/TableProcessingOperations.java | 3 +--
3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index e3d497a..49f3c5e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.spark.sql.test.util.QueryTest
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
@@ -191,6 +190,18 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
}
+ test("test if stale folders are deleting on data load") {
+ sql("drop table if exists stale")
+ sql("create table stale(a string) stored by 'carbondata'")
+ sql("insert into stale values('k')")
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "stale")
+ val tableStatusFile = new CarbonTablePath(null,
+ carbonTable.getTablePath).getTableStatusFilePath
+ FileFactory.getCarbonFile(tableStatusFile).delete()
+ sql("insert into stale values('k')")
+ checkAnswer(sql("select * from stale"), Row("k"))
+ }
+
override def afterAll {
sql("DROP TABLE if exists loadtest")
sql("drop table if exists invalidMeasures")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index ebdaa33..9f6fce1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{CausedBy, FileUtils}
@@ -42,9 +42,9 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format
import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -139,7 +139,10 @@ case class CarbonLoadDataCommand(
carbonLoadModel,
hadoopConf
)
-
+ // Delete stale segment folders that are not in table status but are physically present in
+ // the Fact folder
+ LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+ TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
try {
val operationContext = new OperationContext
val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index cb53d6e..e2be79c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -74,8 +74,7 @@ public class TableProcessingOperations {
CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
boolean found = false;
for (int j = 0; j < details.length; j++) {
- if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
- .equals(partitionCount)) {
+ if (details[j].getLoadName().equals(segmentId)) {
found = true;
break;
}