You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/03/02 14:52:45 UTC
carbondata git commit: [CARBONDATA-2219] Added validation for
external partition location to use same schema.
Repository: carbondata
Updated Branches:
refs/heads/master ac30e3e72 -> 7bfe4afe4
[CARBONDATA-2219] Added validation for external partition location to use same schema.
This closes #2018
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7bfe4afe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7bfe4afe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7bfe4afe
Branch: refs/heads/master
Commit: 7bfe4afe4610ac51170883cc3660acb6b2700e75
Parents: ac30e3e
Author: ravipesala <ra...@gmail.com>
Authored: Thu Mar 1 12:04:53 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 2 20:22:09 2018 +0530
----------------------------------------------------------------------
.../blockletindex/SegmentIndexFileStore.java | 13 +++--
.../core/metadata/SegmentFileStore.java | 27 +++++++++-
.../examples/CarbonPartitionExample.scala | 3 +-
.../StandardPartitionTableQueryTestCase.scala | 57 +++++++++++++++-----
...arbonAlterTableAddHivePartitionCommand.scala | 18 ++++++-
5 files changed, 94 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index b88c1f4..4883d94 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -90,23 +90,22 @@ public class SegmentIndexFileStore {
/**
* Read all index files and keep the cache in it.
*
- * @param segmentFileStore
+ * @param segmentFile
* @throws IOException
*/
- public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
- boolean ignoreStatus) throws IOException {
+ public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath,
+ SegmentStatus status, boolean ignoreStatus) throws IOException {
List<CarbonFile> carbonIndexFiles = new ArrayList<>();
- if (segmentFileStore.getLocationMap() == null) {
+ if (segmentFile == null) {
return;
}
- for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFile
.getLocationMap().entrySet()) {
String location = locations.getKey();
if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) {
if (locations.getValue().isRelative()) {
- location =
- segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
}
for (String indexFile : locations.getValue().getFiles()) {
CarbonFile carbonFile = FileFactory
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1902ab9..f2548b5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -314,7 +315,7 @@ public class SegmentFileStore {
}
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
indexFilesMap = new HashMap<>();
- indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
@@ -329,6 +330,30 @@ public class SegmentFileStore {
}
/**
+ * Reads all index files and get the schema of each index file
+ * @throws IOException
+ */
+ public static Map<String, List<ColumnSchema>> getSchemaFiles(SegmentFile segmentFile,
+ String tablePath) throws IOException {
+ Map<String, List<ColumnSchema>> schemaMap = new HashMap<>();
+ if (segmentFile == null) {
+ return schemaMap;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFileStore.readAllIIndexOfSegment(segmentFile, tablePath, SegmentStatus.SUCCESS, true);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ if (indexInfo.size() > 0) {
+ schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
+ }
+ }
+ return schemaMap;
+ }
+
+ /**
* Gets all index files from this segment
* @return
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 6837c56..2391dbe 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
object CarbonPartitionExample {
@@ -195,7 +196,7 @@ object CarbonPartitionExample {
try {
spark.sql("""SHOW PARTITIONS t1""").show(100, false)
} catch {
- case ex: AnalysisException => LOGGER.error(ex.getMessage())
+ case ex: ProcessMetaDataException => LOGGER.error(ex.getMessage())
}
spark.sql("""SHOW PARTITIONS t0""").show(100, false)
spark.sql("""SHOW PARTITIONS t3""").show(100, false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 918bbff..58eb9f9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -290,15 +290,17 @@ test("Creation of partition table should fail if the colname in table schema and
val location = metastoredb +"/" +"ravi"
sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
- val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload")
+ val frame = sql("select count(empno) from staticpartitionlocload")
verifyPartitionInfo(frame, Seq("empname=ravi"))
- assert(frame.count() == 10)
+ checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(10)))
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(20)))
val file = FileFactory.getCarbonFile(location)
assert(file.exists())
FileFactory.deleteAllCarbonFilesOfDir(file)
}
- test("add external partition with static column partition with load command") {
+ test("add external partition with static column partition with load command with diffrent schema") {
sql(
"""
@@ -324,18 +326,43 @@ test("Creation of partition table should fail if the colname in table schema and
| PARTITIONED BY (empname String)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
- sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
- val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
- verifyPartitionInfo(frame, Seq("empname=ravi"))
- assert(frame.count() == 10)
- val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra"
- sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""")
- val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
- verifyPartitionInfo(frame1, Seq("empname=indra"))
- assert(frame1.count() == 20)
+ intercept[Exception] {
+ sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
+ }
+ assert(sql(s"show partitions staticpartitionextlocload").count() == 0)
val file = FileFactory.getCarbonFile(location)
- assert(file.exists())
- FileFactory.deleteAllCarbonFilesOfDir(file)
+ if(file.exists()) {
+ FileFactory.deleteAllCarbonFilesOfDir(file)
+ }
+ }
+
+ test("add external partition with static column partition with load command") {
+
+ sql(
+ """
+ | CREATE TABLE staticpartitionlocloadother_new (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
+ | projectjoindate Timestamp,attendance int,
+ | deptname String,projectcode int,
+ | utilization int,salary int,projectenddate Date,doj Timestamp)
+ | PARTITIONED BY (empname String)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val location = metastoredb +"/" +"ravi1"
+ sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
+ sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP PARTITION(empname='ravi')""")
+ checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(10)))
+ sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
+ checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(30)))
+ val file = FileFactory.getCarbonFile(location)
+ if(file.exists()) {
+ FileFactory.deleteAllCarbonFilesOfDir(file)
+ }
}
test("drop partition on preAggregate table should fail"){
@@ -387,6 +414,8 @@ test("Creation of partition table should fail if the colname in table schema and
sql("drop table if exists staticpartitionlocload")
sql("drop table if exists staticpartitionextlocload")
sql("drop table if exists staticpartitionlocloadother")
+ sql("drop table if exists staticpartitionextlocload_new")
+ sql("drop table if exists staticpartitionlocloadother_new")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7bfe4afe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 2aaecc7..b0e6b94 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -75,7 +75,12 @@ case class CarbonAlterTableAddHivePartitionCommand(
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
- AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true)
+ AlterTableDropPartitionCommand(
+ tableName,
+ partitionSpecsAndLocs.map(_._1),
+ ifExists = true,
+ purge = false,
+ retainData = true).run(sparkSession)
val msg = s"Got exception $exception when processing data of add partition." +
"Dropping partitions to the metadata"
LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
@@ -88,6 +93,17 @@ case class CarbonAlterTableAddHivePartitionCommand(
val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
partitionSpecsAndLocsTobeAdded)
if (segmentFile != null) {
+ val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
+ val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala
+ var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
+ columnSchemas.asScala.exists { col =>
+ tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
+ } && columnSchemas.size() == tableColums.length
+ }
+ if (!isSameSchema) {
+ throw new UnsupportedOperationException(
+ "Schema of index files located in location is not matching with current table schema")
+ }
val loadModel = new CarbonLoadModel
loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
// Create new entry in tablestatus file