You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/02 13:39:49 UTC

carbondata git commit: [CARBONDATA-2217]fix drop partition for non existing partition and set FactTimeStamp during compaction for partition table

Repository: carbondata
Updated Branches:
  refs/heads/master f74d1efac -> aa910ddb2


[CARBONDATA-2217]fix drop partition for non existing partition and set FactTimeStamp during compaction for partition table

Problem:
1)when drop partition is fired for a column which does not exists , it throws null pointer exception
2)select * is not working when clean files operation is fired after second level of compaction, it throws exception sometimes
3)new segment is getting created for all the segments if any one partition is dropped

Solution:
1)have a null check , if column does not exists
2)give different timestamp for fact files during compaction to avoid deletion of files during clean files
3)for the partition which is dropped, only for that new segment file should be written and not for all the partition
4) This PR also contains fix for creating a pre aggregate table with same name which has already created in other database

This closes #2017


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aa910ddb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aa910ddb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aa910ddb

Branch: refs/heads/master
Commit: aa910ddb2460d7fa18ff594859391eb888b585b9
Parents: f74d1ef
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Feb 28 17:28:43 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Mar 2 19:09:36 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         | 24 +++++++++--------
 .../preaggregate/TestPreAggCreateCommand.scala  | 27 ++++++++++++++++++--
 .../StandardPartitionGlobalSortTestCase.scala   | 13 ++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  3 +++
 .../spark/rdd/CarbonTableCompactor.scala        |  3 +++
 .../datamap/CarbonCreateDataMapCommand.scala    |  2 +-
 .../management/CarbonLoadDataCommand.scala      | 21 ++++++++++-----
 7 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index b5f5a25..1902ab9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -371,19 +371,23 @@ public class SegmentFileStore {
       }
       Path path = new Path(location);
       // Update the status to delete if path equals
-      for (PartitionSpec spec : partitionSpecs) {
-        if (path.equals(spec.getLocation())) {
-          entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
-          updateSegment = true;
-          break;
+      if (null != partitionSpecs) {
+        for (PartitionSpec spec : partitionSpecs) {
+          if (path.equals(spec.getLocation())) {
+            entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+            updateSegment = true;
+            break;
+          }
         }
       }
     }
-    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
-    writePath =
-        writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
-            + CarbonTablePath.SEGMENT_EXT;
-    writeSegmentFile(segmentFile, writePath);
+    if (updateSegment) {
+      String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+      writePath =
+          writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+              + CarbonTablePath.SEGMENT_EXT;
+      writeSegmentFile(segmentFile, writePath);
+    }
     // Check whether we can completly remove the segment.
     boolean deleteSegment = true;
     for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 1e59a80..8b71a31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -34,11 +34,13 @@ import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, M
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
+    sql("drop database if exists otherDB cascade")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")
     sql("drop table if exists maintable")
     sql("drop table if exists showTables")
+    sql("drop table if exists Preagg_twodb")
     sql("create table preaggMain (a string, b string, c string) stored by 'carbondata'")
     sql("create table preaggMain1 (a string, b string, c string) stored by 'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
     sql("create table preaggMain2 (a string, b string, c string) stored by 'carbondata'")
@@ -377,13 +379,32 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
-  test("test show tables filterted with datamaps"){
+  test("test show tables filterted with datamaps") {
     sql("create table showTables(name string, age int) stored by 'carbondata'")
     sql("create datamap preAgg on table showTables using 'preaggregate' as select sum(age) from showTables")
     sql("show tables").show()
     assert(!sql("show tables").collect().contains("showTables_preagg"))
   }
 
+  test("test create main and preagg table of same name in two database") {
+    sql("drop table if exists Preagg_twodb")
+    sql("create table Preagg_twodb(name string, age int) stored by 'carbondata'")
+    sql("create datamap sameName on table Preagg_twodb using 'preaggregate' as select sum(age) from Preagg_twodb")
+    sql("create database otherDB")
+    sql("use otherDB")
+    sql("drop table if exists Preagg_twodb")
+    sql("create table Preagg_twodb(name string, age int) stored by 'carbondata'")
+    try {
+      sql(
+        "create datamap sameName on table Preagg_twodb using 'preaggregate' as select sum(age) from Preagg_twodb")
+      assert(true)
+    } catch {
+      case ex: Exception =>
+        assert(false)
+    }
+    sql("use default")
+  }
+
   def getCarbontable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
     plan.transform {
@@ -405,12 +426,14 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     carbonTable
   }
 
-  override def afterAll {
+  override def afterAll { 
+    sql("drop database if exists otherDB cascade")
     sql("drop table if exists maintable")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")
     sql("drop table if exists maintabletime")
     sql("drop table if exists showTables")
+    sql("drop table if exists Preagg_twodb")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 7d0959c..ff062cd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -914,6 +914,17 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     assert(sql("show segments for table comp_dt2").collect().length == 3)
     assert(sql("select * from comp_dt2").collect().length == 13)
     sql("clean files for table comp_dt2")
+    assert(sql("show segments for table comp_dt2").collect().length == 1)
+    assert(sql("select * from comp_dt2").collect().length == 13)
+  }
+
+  test("test insert into partition column which does not exists") {
+    sql("drop table if exists partitionNoColumn")
+    sql("create table partitionNoColumn (name string, dob date) partitioned by(year int,month int) stored by 'carbondata'")
+    val exMessage = intercept[Exception] {
+      sql("insert into partitionNoColumn partition(year=2014,month=01,day=01) select 'martin','2014-04-07'")
+    }
+    assert(exMessage.getMessage.contains("day is not a valid partition column in table default.partitionnocolumn"))
   }
 
   override def afterAll = {
@@ -976,5 +987,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists partitiondecimal")
     sql("drop table if exists partitiondecimalstatic")
     sql("drop table if exists partitiondatadelete")
+    sql("drop table if exists comp_dt2")
+    sql("drop table if exists partitionNoColumn")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1695a13..9985a3a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -544,6 +544,9 @@ object CarbonDataRDDFactory {
       }
       try {
         // compaction handling
+        if (carbonTable.isHivePartitionTable) {
+          carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+        }
         handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, operationContext)
       } catch {
         case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 07acaa5..a4fa37a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -96,6 +96,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           segList,
           compactionModel.compactionType
         )
+        if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+          carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+        }
       }
       else {
         loadsToMerge.clear()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 0fd5437..0eb7ad5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -53,7 +53,7 @@ case class CarbonCreateDataMapCommand(
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = tableIdentifier.database.getOrElse("default")
+    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
     val tableName = tableIdentifier.table + "_" + dataMapName
     val newDmProperties = if (dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).isDefined) {
       dmProperties.updated(TimeSeriesUtil.TIMESERIES_EVENTTIME,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa910ddb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7800d3e..71b8387 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -563,13 +563,20 @@ case class CarbonLoadDataCommand(
     var partitionsLen = 0
     val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
     val partitionValues = if (partition.nonEmpty) {
-      partition.filter(_._2.nonEmpty).map{ case(col, value) =>
-        val field = catalogTable.schema.find(_.name.equalsIgnoreCase(col)).get
-        CarbonScalaUtil.convertToDateAndTimeFormats(
-          value.get,
-          field.dataType,
-          timeStampFormat,
-          dateFormat)
+      partition.filter(_._2.nonEmpty).map { case (col, value) =>
+        catalogTable.schema.find(_.name.equalsIgnoreCase(col)) match {
+          case Some(c) =>
+            CarbonScalaUtil.convertToDateAndTimeFormats(
+              value.get,
+              c.dataType,
+              timeStampFormat,
+              dateFormat)
+          case None =>
+            throw new AnalysisException(s"$col is not a valid partition column in table ${
+              carbonLoadModel
+                .getDatabaseName
+            }.${ carbonLoadModel.getTableName }")
+        }
       }.toArray
     } else {
       Array[String]()