You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/29 01:55:54 UTC
carbondata git commit: [CARBONDATA-2274] fix for Partition table
having more than 4 column giving zero record
Repository: carbondata
Updated Branches:
refs/heads/master 877eabdd6 -> 3647aee3c
[CARBONDATA-2274] fix for Partition table having more than 4 column giving zero record
Converting of Array[String,String] to Map[String, String] was giving wrong order of partition column. And we were using that sequence to create path.
So used LinkedHashMap to avoid reordering.
This closes #2096
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3647aee3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3647aee3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3647aee3
Branch: refs/heads/master
Commit: 3647aee3cc24a623d982d4d7c6c1de0b06f58370
Parents: 877eabd
Author: rahulforallp <ra...@knoldus.in>
Authored: Fri Mar 23 20:19:43 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Mar 29 07:25:42 2018 +0530
----------------------------------------------------------------------
.../StandardPartitionTableLoadingTestCase.scala | 21 ++++++++++++++++++++
.../carbondata/spark/util/CarbonScalaUtil.scala | 18 ++++++++++-------
.../datasources/SparkCarbonTableFormat.scala | 18 +++++++++++------
3 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index baf1627..8342c69 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -135,6 +135,26 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
}
+ test("data loading for partition table for five partition column") {
+ sql(
+ """
+ | CREATE TABLE partitionfive (empno int, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int)
+ | PARTITIONED BY (utilization int,salary int,workgroupcategory int, empname String,
+ | designation String)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionfive OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_partitionfive", "0", 10)
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionfive order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionfive where empno>15 order by empno "),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno>15 order by empno"))
+ }
test("multiple data loading for partition table for three partition column") {
sql(
@@ -519,6 +539,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
sql("drop table if exists partitionone")
sql("drop table if exists partitiontwo")
sql("drop table if exists partitionthree")
+ sql("drop table if exists partitionfive")
sql("drop table if exists partitionmultiplethree")
sql("drop table if exists insertpartitionthree")
sql("drop table if exists staticpartitionone")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 394ba5f..37cdc41 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -23,6 +23,8 @@ import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.Date
+import scala.collection.mutable
+
import com.univocity.parsers.common.TextParsingException
import org.apache.spark.SparkException
import org.apache.spark.sql._
@@ -294,13 +296,12 @@ object CarbonScalaUtil {
* Update partition values as per the right date and time format
* @return updated partition spec
*/
- def updatePartitions(
- partitionSpec: Map[String, String],
- table: CarbonTable): Map[String, String] = {
+ def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
+ table: CarbonTable): mutable.LinkedHashMap[String, String] = {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
- partitionSpec.map{ case (col, pvalue) =>
+ partitionSpec.map { case (col, pvalue) =>
// replace special string with empty value.
val value = if (pvalue == null) {
hivedefaultpartition
@@ -340,11 +341,14 @@ object CarbonScalaUtil {
def updatePartitions(
parts: Seq[CatalogTablePartition],
table: CarbonTable): Seq[CatalogTablePartition] = {
- parts.map{ f =>
+ parts.map { f =>
+ val specLinkedMap: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap
+ .empty[String, String]
+ f.spec.foreach(fSpec => specLinkedMap.put(fSpec._1, fSpec._2))
val changedSpec =
updatePartitions(
- f.spec,
- table)
+ specLinkedMap,
+ table).toMap
f.copy(spec = changedSpec)
}.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 54f861c..9110482 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -276,10 +276,14 @@ private class CarbonOutputWriter(path: String,
}
}
var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+ val linkedMap = mutable.LinkedHashMap[String, String]()
val updatedPartitions = partitions.map(splitPartition)
- (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+ updatedPartitions.foreach {
+ case (k, v) => linkedMap.put(k, v)
+ }
+ (linkedMap, updatePartitions(updatedPartitions.map(_._2)))
} else {
- (Map.empty[String, String].toArray, Array.empty)
+ (mutable.LinkedHashMap.empty[String, String], Array.empty)
}
private def splitPartition(p: String) = {
@@ -305,8 +309,10 @@ private class CarbonOutputWriter(path: String,
val index = currPartitions.indexOf(writeSpec)
if (index > -1) {
val spec = currPartitions.get(index)
- updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
- partitionData = updatePartitions(updatedPartitions.map(_._2))
+ spec.getPartitions.asScala.map(splitPartition).foreach {
+ case (k, v) => updatedPartitions.put(k, v)
+ }
+ partitionData = updatePartitions(updatedPartitions.map(_._2).toSeq)
}
}
updatedPath
@@ -393,7 +399,7 @@ private class CarbonOutputWriter(path: String,
val formattedPartitions =
// All dynamic partitions need to be converted to proper format
CarbonScalaUtil.updatePartitions(
- updatedPartitions.toMap,
+ updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]],
model.getCarbonDataLoadSchema.getCarbonTable)
formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
SegmentFileStore.writeSegmentFile(
@@ -411,7 +417,7 @@ private class CarbonOutputWriter(path: String,
val formattedPartitions =
// All dynamic partitions need to be converted to proper format
CarbonScalaUtil.updatePartitions(
- updatedPartitions.toMap,
+ updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]],
model.getCarbonDataLoadSchema.getCarbonTable)
val partitionstr = formattedPartitions.map{p =>
ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)