You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 18:21:53 UTC

[05/36] carbondata git commit: [CARBONDATA-2150] Unwanted updatetable status files are being generated for the delete operation where no records are deleted

[CARBONDATA-2150] Unwanted updatetable status files are being generated for the delete operation where no records are deleted

Problem:
Unwanted updatetable status files are being generated for the delete operation where no records are deleted

Analysis:
when the filter value for delete operation is less than the maximum value in that column, then getsplits() will return the
block and hence in delete logic, it was creating update table status file even though delete operation was not done and
added spark context to create database event

This closes #1957


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da549c2b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da549c2b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da549c2b

Branch: refs/heads/carbonstore-rebase
Commit: da549c2b721ae66f55fd28bebe775ac10373c4e2
Parents: 420cc35
Author: akashrn5 <ak...@gmail.com>
Authored: Thu Feb 8 23:10:43 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Fri Feb 9 19:30:47 2018 +0530

----------------------------------------------------------------------
 .../iud/DeleteCarbonTableTestCase.scala         | 22 +++++++++++++++++++-
 .../events/CreateDatabaseEvents.scala           |  6 +++++-
 .../scala/org/apache/spark/util/FileUtils.scala |  5 +++--
 .../command/mutation/DeleteExecution.scala      |  2 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  2 +-
 5 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/da549c2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 6521657..22aa385 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -17,9 +17,11 @@
 package org.apache.carbondata.spark.testsuite.iud
 
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
 
 class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
@@ -180,6 +182,24 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       Seq(Row(1, "abc"), Row(3, "uhj"), Row(4, "frg")))
   }
 
+  test("test number of update table status files after delete query where no records are deleted") {
+    sql("drop table if exists update_status_files")
+    sql("create table update_status_files(name string,age int) stored by 'carbondata'")
+    sql("insert into update_status_files select 'abc',1")
+    sql("insert into update_status_files select 'def',2")
+    sql("insert into update_status_files select 'xyz',4")
+    sql("insert into update_status_files select 'abc',6")
+    sql("alter table update_status_files compact 'minor'")
+    sql("delete from update_status_files where age=3").show()
+    sql("delete from update_status_files where age=5").show()
+    val carbonTable = CarbonEnv
+      .getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession)
+    val metaPath = carbonTable.getMetaDataFilepath
+    val files = FileFactory.getCarbonFile(metaPath)
+    assert(files.listFiles().length == 2)
+    sql("drop table update_status_files")
+  }
+
 
   override def afterAll {
     sql("use default")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da549c2b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
index dae22b1..c1d79db 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
@@ -17,11 +17,15 @@
 
 package org.apache.carbondata.events
 
+import org.apache.spark.SparkContext
+
 
 case class CreateDatabasePreExecutionEvent(databaseName: String) extends Event
   with DatabaseEventInfo
 
-case class CreateDatabasePostExecutionEvent(databaseName: String, dataBasePath: String)
+case class CreateDatabasePostExecutionEvent(databaseName: String,
+    dataBasePath: String,
+    sparkContext: SparkContext)
   extends Event with DatabaseEventInfo
 
 case class CreateDatabaseAbortExecutionEvent(databaseName: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da549c2b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
index 95ba318..12ce17f 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import java.io.{File, IOException}
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -106,13 +107,13 @@ object FileUtils {
     }
   }
 
-  def createDatabaseDirectory(dbName: String, storePath: String) {
+  def createDatabaseDirectory(dbName: String, storePath: String, sparkContext: SparkContext) {
     val databasePath: String = storePath + File.separator + dbName.toLowerCase
     val fileType = FileFactory.getFileType(databasePath)
     FileFactory.mkdirs(databasePath, fileType)
     val operationContext = new OperationContext
     val createDatabasePostExecutionEvent = new CreateDatabasePostExecutionEvent(dbName,
-      databasePath)
+      databasePath, sparkContext)
     OperationListenerBus.getInstance.fireEvent(createDatabasePostExecutionEvent, operationContext)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da549c2b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a8efb84..1ac0b34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -134,7 +134,7 @@ object DeleteExecution {
     ).collect()
 
     // if no loads are present then no need to do anything.
-    if (res.isEmpty) {
+    if (res.flatten.isEmpty) {
       return segmentsTobeDeleted
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da549c2b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 83831e3..0178716 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -91,7 +91,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           case e: NoSuchDatabaseException =>
             CarbonProperties.getStorePath
         }
-        FileUtils.createDatabaseDirectory(dbName, dbLocation)
+        FileUtils.createDatabaseDirectory(dbName, dbLocation, sparkSession.sparkContext)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil