You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/29 01:55:54 UTC

carbondata git commit: [CARBONDATA-2274] fix for Partition table having more than 4 column giving zero record

Repository: carbondata
Updated Branches:
  refs/heads/master 877eabdd6 -> 3647aee3c


[CARBONDATA-2274] fix for Partition table having more than 4 column giving zero record

Converting of Array[String,String] to Map[String, String] was giving wrong order of partition column. And we were using that sequence to create path.

So used LinkedHashMap to avoid reordering.

This closes #2096


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3647aee3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3647aee3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3647aee3

Branch: refs/heads/master
Commit: 3647aee3cc24a623d982d4d7c6c1de0b06f58370
Parents: 877eabd
Author: rahulforallp <ra...@knoldus.in>
Authored: Fri Mar 23 20:19:43 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Mar 29 07:25:42 2018 +0530

----------------------------------------------------------------------
 .../StandardPartitionTableLoadingTestCase.scala | 21 ++++++++++++++++++++
 .../carbondata/spark/util/CarbonScalaUtil.scala | 18 ++++++++++-------
 .../datasources/SparkCarbonTableFormat.scala    | 18 +++++++++++------
 3 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index baf1627..8342c69 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -135,6 +135,26 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
   }
 
+  test("data loading for partition table for five partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionfive (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int)
+        | PARTITIONED BY (utilization int,salary int,workgroupcategory int, empname String,
+        | designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionfive OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitionfive", "0", 10)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionfive order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionfive where empno>15 order by empno "),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno>15 order by empno"))
+  }
 
   test("multiple data loading for partition table for three partition column") {
     sql(
@@ -519,6 +539,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists partitionone")
     sql("drop table if exists partitiontwo")
     sql("drop table if exists partitionthree")
+    sql("drop table if exists partitionfive")
     sql("drop table if exists partitionmultiplethree")
     sql("drop table if exists insertpartitionthree")
     sql("drop table if exists staticpartitionone")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 394ba5f..37cdc41 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -23,6 +23,8 @@ import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import scala.collection.mutable
+
 import com.univocity.parsers.common.TextParsingException
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
@@ -294,13 +296,12 @@ object CarbonScalaUtil {
    * Update partition values as per the right date and time format
    * @return updated partition spec
    */
-  def updatePartitions(
-      partitionSpec: Map[String, String],
-  table: CarbonTable): Map[String, String] = {
+  def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
+      table: CarbonTable): mutable.LinkedHashMap[String, String] = {
     val cacheProvider: CacheProvider = CacheProvider.getInstance
     val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
       cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
-    partitionSpec.map{ case (col, pvalue) =>
+    partitionSpec.map { case (col, pvalue) =>
       // replace special string with empty value.
       val value = if (pvalue == null) {
         hivedefaultpartition
@@ -340,11 +341,14 @@ object CarbonScalaUtil {
   def updatePartitions(
       parts: Seq[CatalogTablePartition],
       table: CarbonTable): Seq[CatalogTablePartition] = {
-    parts.map{ f =>
+    parts.map { f =>
+      val specLinkedMap: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap
+        .empty[String, String]
+      f.spec.foreach(fSpec => specLinkedMap.put(fSpec._1, fSpec._2))
       val changedSpec =
         updatePartitions(
-          f.spec,
-          table)
+          specLinkedMap,
+          table).toMap
       f.copy(spec = changedSpec)
     }.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3647aee3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 54f861c..9110482 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -276,10 +276,14 @@ private class CarbonOutputWriter(path: String,
     }
   }
   var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+    val linkedMap = mutable.LinkedHashMap[String, String]()
     val updatedPartitions = partitions.map(splitPartition)
-    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+    updatedPartitions.foreach {
+      case (k, v) => linkedMap.put(k, v)
+    }
+    (linkedMap, updatePartitions(updatedPartitions.map(_._2)))
   } else {
-    (Map.empty[String, String].toArray, Array.empty)
+    (mutable.LinkedHashMap.empty[String, String], Array.empty)
   }
 
   private def splitPartition(p: String) = {
@@ -305,8 +309,10 @@ private class CarbonOutputWriter(path: String,
       val index = currPartitions.indexOf(writeSpec)
       if (index > -1) {
         val spec = currPartitions.get(index)
-        updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
-        partitionData = updatePartitions(updatedPartitions.map(_._2))
+        spec.getPartitions.asScala.map(splitPartition).foreach {
+          case (k, v) => updatedPartitions.put(k, v)
+        }
+        partitionData = updatePartitions(updatedPartitions.map(_._2).toSeq)
       }
     }
     updatedPath
@@ -393,7 +399,7 @@ private class CarbonOutputWriter(path: String,
     val formattedPartitions =
     // All dynamic partitions need to be converted to proper format
       CarbonScalaUtil.updatePartitions(
-        updatedPartitions.toMap,
+        updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]],
         model.getCarbonDataLoadSchema.getCarbonTable)
     formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
     SegmentFileStore.writeSegmentFile(
@@ -411,7 +417,7 @@ private class CarbonOutputWriter(path: String,
       val formattedPartitions =
       // All dynamic partitions need to be converted to proper format
         CarbonScalaUtil.updatePartitions(
-          updatedPartitions.toMap,
+          updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]],
           model.getCarbonDataLoadSchema.getCarbonTable)
       val partitionstr = formattedPartitions.map{p =>
         ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)