You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/03/02 14:52:45 UTC

carbondata git commit: [CARBONDATA-2219] Added validation for external partition location to use same schema.

Repository: carbondata
Updated Branches:
  refs/heads/master ac30e3e72 -> 7bfe4afe4


[CARBONDATA-2219] Added validation for external partition location to use same schema.

This closes #2018


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7bfe4afe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7bfe4afe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7bfe4afe

Branch: refs/heads/master
Commit: 7bfe4afe4610ac51170883cc3660acb6b2700e75
Parents: ac30e3e
Author: ravipesala <ra...@gmail.com>
Authored: Thu Mar 1 12:04:53 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 2 20:22:09 2018 +0530

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    | 13 +++--
 .../core/metadata/SegmentFileStore.java         | 27 +++++++++-
 .../examples/CarbonPartitionExample.scala       |  3 +-
 .../StandardPartitionTableQueryTestCase.scala   | 57 +++++++++++++++-----
 ...arbonAlterTableAddHivePartitionCommand.scala | 18 ++++++-
 5 files changed, 94 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index b88c1f4..4883d94 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -90,23 +90,22 @@ public class SegmentIndexFileStore {
   /**
    * Read all index files and keep the cache in it.
    *
-   * @param segmentFileStore
+   * @param segmentFile
    * @throws IOException
    */
-  public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
-      boolean ignoreStatus) throws IOException {
+  public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath,
+      SegmentStatus status, boolean ignoreStatus) throws IOException {
     List<CarbonFile> carbonIndexFiles = new ArrayList<>();
-    if (segmentFileStore.getLocationMap() == null) {
+    if (segmentFile == null) {
       return;
     }
-    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFile
         .getLocationMap().entrySet()) {
       String location = locations.getKey();
 
       if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) {
         if (locations.getValue().isRelative()) {
-          location =
-              segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
         }
         for (String indexFile : locations.getValue().getFiles()) {
           CarbonFile carbonFile = FileFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1902ab9..f2548b5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -314,7 +315,7 @@ public class SegmentFileStore {
     }
     SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
     indexFilesMap = new HashMap<>();
-    indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+    indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
     Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
@@ -329,6 +330,30 @@ public class SegmentFileStore {
   }
 
   /**
+   * Reads all index files and get the schema of each index file
+   * @throws IOException
+   */
+  public static Map<String, List<ColumnSchema>> getSchemaFiles(SegmentFile segmentFile,
+      String tablePath) throws IOException {
+    Map<String, List<ColumnSchema>> schemaMap = new HashMap<>();
+    if (segmentFile == null) {
+      return schemaMap;
+    }
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    indexFileStore.readAllIIndexOfSegment(segmentFile, tablePath, SegmentStatus.SUCCESS, true);
+    Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+      List<DataFileFooter> indexInfo =
+          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+      if (indexInfo.size() > 0) {
+        schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
+      }
+    }
+    return schemaMap;
+  }
+
+  /**
    * Gets all index files from this segment
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 6837c56..2391dbe 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 object CarbonPartitionExample {
 
@@ -195,7 +196,7 @@ object CarbonPartitionExample {
     try {
       spark.sql("""SHOW PARTITIONS t1""").show(100, false)
     } catch {
-      case ex: AnalysisException => LOGGER.error(ex.getMessage())
+      case ex: ProcessMetaDataException => LOGGER.error(ex.getMessage())
     }
     spark.sql("""SHOW PARTITIONS t0""").show(100, false)
     spark.sql("""SHOW PARTITIONS t3""").show(100, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 918bbff..58eb9f9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -290,15 +290,17 @@ test("Creation of partition table should fail if the colname in table schema and
     val location = metastoredb +"/" +"ravi"
     sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-    val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload")
+    val frame = sql("select count(empno) from staticpartitionlocload")
     verifyPartitionInfo(frame, Seq("empname=ravi"))
-    assert(frame.count() == 10)
+    checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(10)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(20)))
     val file = FileFactory.getCarbonFile(location)
     assert(file.exists())
     FileFactory.deleteAllCarbonFilesOfDir(file)
   }
 
-  test("add external partition with static column partition with load command") {
+  test("add external partition with static column partition with load command with diffrent schema") {
 
     sql(
       """
@@ -324,18 +326,43 @@ test("Creation of partition table should fail if the colname in table schema and
         | PARTITIONED BY (empname String)
         | STORED BY 'org.apache.carbondata.format'
       """.stripMargin)
-    sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
-    val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
-    verifyPartitionInfo(frame, Seq("empname=ravi"))
-    assert(frame.count() == 10)
-    val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra"
-    sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""")
-    val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
-    verifyPartitionInfo(frame1, Seq("empname=indra"))
-    assert(frame1.count() == 20)
+    intercept[Exception] {
+      sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
+    }
+    assert(sql(s"show partitions staticpartitionextlocload").count() == 0)
     val file = FileFactory.getCarbonFile(location)
-    assert(file.exists())
-    FileFactory.deleteAllCarbonFilesOfDir(file)
+    if(file.exists()) {
+      FileFactory.deleteAllCarbonFilesOfDir(file)
+    }
+  }
+
+  test("add external partition with static column partition with load command") {
+
+    sql(
+      """
+        | CREATE TABLE staticpartitionlocloadother_new (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi1"
+    sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
+    sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP PARTITION(empname='ravi')""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(10)))
+    sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(30)))
+    val file = FileFactory.getCarbonFile(location)
+    if(file.exists()) {
+      FileFactory.deleteAllCarbonFilesOfDir(file)
+    }
   }
 
   test("drop partition on preAggregate table should fail"){
@@ -387,6 +414,8 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists staticpartitionlocload")
     sql("drop table if exists staticpartitionextlocload")
     sql("drop table if exists staticpartitionlocloadother")
+    sql("drop table if exists staticpartitionextlocload_new")
+    sql("drop table if exists staticpartitionlocloadother_new")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 2aaecc7..b0e6b94 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -75,7 +75,12 @@ case class CarbonAlterTableAddHivePartitionCommand(
 
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
-    AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true)
+    AlterTableDropPartitionCommand(
+      tableName,
+      partitionSpecsAndLocs.map(_._1),
+      ifExists = true,
+      purge = false,
+      retainData = true).run(sparkSession)
     val msg = s"Got exception $exception when processing data of add partition." +
               "Dropping partitions to the metadata"
     LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
@@ -88,6 +93,17 @@ case class CarbonAlterTableAddHivePartitionCommand(
       val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
         partitionSpecsAndLocsTobeAdded)
       if (segmentFile != null) {
+        val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
+        val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala
+        var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
+          columnSchemas.asScala.exists { col =>
+            tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
+          } && columnSchemas.size() == tableColums.length
+        }
+        if (!isSameSchema) {
+          throw new UnsupportedOperationException(
+            "Schema of index files located in location is not matching with current table schema")
+        }
         val loadModel = new CarbonLoadModel
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file