You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/10/23 08:33:55 UTC

carbondata git commit: [CARBONDATA-3028][32k] Fix bugs in spark file format table with blanks in longstringcolumns

Repository: carbondata
Updated Branches:
  refs/heads/master 682cac85a -> c7c83684b


[CARBONDATA-3028][32k] Fix bugs in spark file format table with blanks in longstringcolumns

If we create a spark file format table with multiple longstringcolumns
and the option long_string_columns contains blank characters, the
query on that table will fail, cause it didn't recognize the correct
varchar columns. The root cause is that carbondata didn't trim the blank
in long_string_columns while it recognizing the varchar columns.

This closes #2834


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c7c83684
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c7c83684
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c7c83684

Branch: refs/heads/master
Commit: c7c83684b0237e3eec9d6c3af904293e207e733a
Parents: 682cac8
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Fri Oct 19 10:37:23 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Oct 23 16:32:24 2018 +0800

----------------------------------------------------------------------
 ...tCreateTableUsingSparkCarbonFileFormat.scala | 38 ++++++++++----------
 .../sdk/file/CarbonWriterBuilder.java           | 25 +++++++------
 2 files changed, 33 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7c83684/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 755a7df..250e9a6 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.carbondata.datasource
 
 import java.io.File
 import java.text.SimpleDateFormat
-import java.util
 import java.util.{Date, Random}
 
 import scala.collection.JavaConverters._
@@ -30,17 +29,14 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.apache.spark.util.SparkUtil
 import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants}
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter}
 import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -433,14 +429,16 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       .append("[ \n")
       .append("   {\"name\":\"string\"},\n")
       .append("   {\"address\":\"varchar\"},\n")
-      .append("   {\"age\":\"int\"}\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"note\":\"varchar\"}\n")
       .append("]")
       .toString()
     val builder = CarbonWriter.builder()
     val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
-    for (i <- 0 until 3) {
+    val totalRecordsNum = 3
+    for (i <- 0 until totalRecordsNum) {
       // write a varchar with 75,000 length
-      writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
+      writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000)))
     }
     writer.close()
 
@@ -449,19 +447,19 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     if (spark.sparkContext.version.startsWith("2.1")) {
       //data source file format
       spark.sql(
-        s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
-           |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
+        s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
+           |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
           .stripMargin)
     } else {
       //data source file format
       spark.sql(
-        s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
-           |OPTIONS("long_String_columns"="address") LOCATION
+        s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon
+           |OPTIONS("long_String_columns"="address, note") LOCATION
            |'$writerPath' """.stripMargin)
     }
-    assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
-    val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
-    assert(op.get(0).getString(0).length == 75000)
+    checkAnswer(spark.sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
+    checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"),
+      Seq(Row(totalRecordsNum)))
     spark.sql("DROP TABLE sdkOutputTable")
 
     //--------------- data source external table without schema ---------------------------
@@ -471,16 +469,16 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       spark
         .sql(
           s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
-             |'$writerPath', "long_String_columns" "address") """.stripMargin)
+             |'$writerPath', "long_String_columns" "address, note") """.stripMargin)
     } else {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
-           |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
+           |("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
     }
-    assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
-    val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
-    assert(op1.get(0).getString(0).length == 75000)
+    checkAnswer(spark.sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
+    checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where length(address)=75000 and length(note)=75000)"),
+      Seq(Row(totalRecordsNum)))
     spark.sql("DROP TABLE sdkOutputTableWithoutSchema")
     clearDataMapCache
     cleanTestData()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7c83684/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 87930f6..ed2c956 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -412,12 +413,17 @@ public class CarbonWriterBuilder {
   public CarbonLoadModel buildLoadModel(Schema carbonSchema)
       throws IOException, InvalidLoadOptionException {
     timestamp = System.nanoTime();
-    Set<String> longStringColumns = null;
-    if (options != null && options.get("long_string_columns") != null) {
-      longStringColumns =
-          new HashSet<>(Arrays.asList(options.get("long_string_columns").toLowerCase().split(",")));
+    // validate long_string_column
+    Set<String> longStringColumns = new HashSet<>();
+    if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {
+      String[] specifiedLongStrings =
+          options.get(CarbonCommonConstants.LONG_STRING_COLUMNS).toLowerCase().split(",");
+      for (String str : specifiedLongStrings) {
+        longStringColumns.add(str.trim());
+      }
       validateLongStringColumns(carbonSchema, longStringColumns);
     }
+    // for the longstring field, change the datatype from string to varchar
     this.schema = updateSchemaFields(carbonSchema, longStringColumns);
     // build CarbonTable using schema
     CarbonTable table = buildCarbonTable();
@@ -603,12 +609,11 @@ public class CarbonWriterBuilder {
     for (int i = 0; i < fields.length; i++) {
       if (fields[i] != null) {
         fields[i].updateNameToLowerCase();
-      }
-
-      if (longStringColumns != null) {
-        /* Also update the string type to varchar */
-        if (longStringColumns.contains(fields[i].getFieldName())) {
-          fields[i].updateDataTypeToVarchar();
+        if (longStringColumns != null) {
+          /* Also update the string type to varchar */
+          if (longStringColumns.contains(fields[i].getFieldName())) {
+            fields[i].updateDataTypeToVarchar();
+          }
         }
       }
     }