You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/13 14:33:11 UTC

[carbondata] branch master updated: [CARBONDATA-3356] Support decimal for json schema and Provide better exception for users to solve problem when carbonData DataSource read SDK files with varchar

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 541a7bd  [CARBONDATA-3356] Support decimal for json schema and Provide better exception for users to solve problem when carbonData DataSource read SDK files with varchar
541a7bd is described below

commit 541a7bd86c8083b5b848168d2b17d3cad63692f2
Author: xubo245 <xu...@huawei.com>
AuthorDate: Mon Apr 29 11:00:14 2019 +0800

    [CARBONDATA-3356] Support decimal for json schema and Provide better exception
    for users to solve problem when carbonData DataSource read SDK files with varchar
    
    Support decimal by json schema and refactor exceptions
    
    This closes #3181
---
 .../carbondata/core/util/BlockletDataMapUtil.java  | 13 +++-
 docs/carbon-as-spark-datasource-guide.md           | 15 ++--
 .../datasource/SparkCarbonDataSourceTest.scala     | 79 ++++++++++++++++++++++
 .../java/org/apache/carbondata/sdk/file/Field.java | 20 ++++++
 .../carbondata/sdk/file/CSVCarbonWriterTest.java   | 66 ++++++++++++++++++
 5 files changed, 185 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 68aad72..9074587 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -257,13 +257,24 @@ public class BlockletDataMapUtil {
    * name but with different dataType.
    */
   public static boolean isSameColumnAndDifferentDatatypeInSchema(
-      List<ColumnSchema> indexFileColumnList, List<ColumnSchema> tableColumnList) {
+      List<ColumnSchema> indexFileColumnList, List<ColumnSchema> tableColumnList)
+      throws IOException {
     for (int i = 0; i < tableColumnList.size(); i++) {
       for (int j = 0; j < indexFileColumnList.size(); j++) {
         if (indexFileColumnList.get(j).getColumnName()
             .equalsIgnoreCase(tableColumnList.get(i).getColumnName()) && !indexFileColumnList.get(j)
             .getDataType().getName()
             .equalsIgnoreCase(tableColumnList.get(i).getDataType().getName())) {
+          if ("varchar".equalsIgnoreCase(indexFileColumnList.get(j).getDataType().getName()) &&
+              "string".equalsIgnoreCase(tableColumnList.get(i).getDataType().getName())) {
+            throw new IOException("Datatype of the Column "
+                + indexFileColumnList.get(j).getDataType().getName()
+                + " present in index file, is varchar and not same as datatype of the column " +
+                "with same name present in table, " +
+                "because carbon convert varchar of carbon to string of spark, " +
+                "please set long_string_columns for varchar column: "
+                + tableColumnList.get(i).getColumnName());
+          }
           LOG.error("Datatype of the Column " + indexFileColumnList.get(j).getColumnName()
               + " present in index file, is not same as datatype of the column with same name"
               + "present in table");
diff --git a/docs/carbon-as-spark-datasource-guide.md b/docs/carbon-as-spark-datasource-guide.md
index 66338f1..b61bf43 100644
--- a/docs/carbon-as-spark-datasource-guide.md
+++ b/docs/carbon-as-spark-datasource-guide.md
@@ -42,19 +42,20 @@ Now you can create Carbon table using Spark's datasource DDL syntax.
 
 | Property | Default Value | Description |
 |-----------|--------------|------------|
-| table_blocksize | 1024 | Size of blocks to write onto hdfs. For  more details, see [Table Block Size Configuration](./ddl-of-carbondata.md#table-block-size-configuration). |
+| table_blocksize | 1024 | Size of blocks to write onto hdfs. For more details, see [Table Block Size Configuration](./ddl-of-carbondata.md#table-block-size-configuration). |
 | table_blocklet_size | 64 | Size of blocklet to write. |
 | table_page_size_inmb | 0 | Size of each page in carbon table, if page size crosses this value before 32000 rows, page will be cut to that many rows. Helps in keep page size to fit cache size |
-| local_dictionary_threshold | 10000 | Cardinality upto which the local dictionary can be generated. For  more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
-| local_dictionary_enable | false | Enable local dictionary generation. For  more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
-| sort_columns | all dimensions are sorted | Columns to include in sort and its order of sort. For  more details, see [Sort Columns Configuration](./ddl-of-carbondata.md#sort-columns-configuration). |
-| sort_scope | local_sort | Sort scope of the load.Options include no sort, local sort, batch sort, and global sort. For  more details, see [Sort Scope Configuration](./ddl-of-carbondata.md#sort-scope-configuration). |
-| long_string_columns | null | Comma separated string/char/varchar columns which are more than 32k length. For  more details, see [String longer than 32000 characters](./ddl-of-carbondata.md#string-longer-than-32000-characters). |
+| local_dictionary_threshold | 10000 | Cardinality upto which the local dictionary can be generated. For more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
+| local_dictionary_enable | false | Enable local dictionary generation. For more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
+| sort_columns | all dimensions are sorted | Columns to include in sort and its order of sort. For more details, see [Sort Columns Configuration](./ddl-of-carbondata.md#sort-columns-configuration). |
+| sort_scope | local_sort | Sort scope of the load.Options include no sort, local sort, batch sort, and global sort. For more details, see [Sort Scope Configuration](./ddl-of-carbondata.md#sort-scope-configuration). |
+| long_string_columns | null | Comma separated string/char/varchar columns which are more than 32k length. For more details, see [String longer than 32000 characters](./ddl-of-carbondata.md#string-longer-than-32000-characters). |
 
+ **NOTE:**  please set long_string_columns for varchar column.
 ## Example 
 
 ```
- CREATE TABLE CARBON_TABLE (NAME  STRING) USING CARBON OPTIONS('table_block_size'='256')
+ CREATE TABLE CARBON_TABLE (NAME STRING) USING CARBON OPTIONS('table_block_size'='256')
 ```
 
 # Using DataFrame
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 56a15bd..8c02cb7 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -22,6 +22,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
@@ -38,6 +39,84 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 
 class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
 
+
+  var writerOutputPath = new File(this.getClass.getResource("/").getPath
+          + "../../target/SparkCarbonFileFormat/SDKWriterOutput/").getCanonicalPath
+  //getCanonicalPath gives path with \, but the code expects /.
+  writerOutputPath = writerOutputPath.replace("\\", "/")
+
+  def buildTestData(rows: Int,
+                    sortColumns: List[String]): Any = {
+    val schema = new StringBuilder()
+            .append("[ \n")
+            .append("   {\"stringField\":\"string\"},\n")
+            .append("   {\"byteField\":\"byte\"},\n")
+            .append("   {\"shortField\":\"short\"},\n")
+            .append("   {\"intField\":\"int\"},\n")
+            .append("   {\"longField\":\"long\"},\n")
+            .append("   {\"doubleField\":\"double\"},\n")
+            .append("   {\"floatField\":\"float\"},\n")
+            .append("   {\"decimalField\":\"decimal(17,2)\"},\n")
+            .append("   {\"boolField\":\"boolean\"},\n")
+            .append("   {\"dateField\":\"DATE\"},\n")
+            .append("   {\"timeField\":\"TIMESTAMP\"},\n")
+            .append("   {\"varcharField\":\"varchar\"},\n")
+            .append("   {\"varcharField2\":\"varchar\"}\n")
+            .append("]")
+            .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerOutputPath)
+                .sortBy(sortColumns.toArray)
+                .uniqueIdentifier(System.currentTimeMillis)
+                .withBlockSize(2)
+                .withCsvInput(Schema.parseJson(schema))
+                .writtenBy("TestNonTransactionalCarbonTable")
+                .build()
+      var i = 0
+      while (i < rows) {
+        writer.write(Array[String]("robot" + i,
+          String.valueOf(i / 100),
+          String.valueOf(i / 100),
+          String.valueOf(i),
+          String.valueOf(i),
+          String.valueOf(i),
+          String.valueOf(i),
+          String.valueOf(i),
+          "true",
+          "2019-03-02",
+          "2019-02-12 03:03:34",
+          "var1",
+          "var2"))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Throwable => throw new RuntimeException(ex)
+    }
+  }
+
+  test("Carbon DataSource read SDK data with varchar") {
+    import spark._
+    FileUtils.deleteDirectory(new File(writerOutputPath))
+    val num = 10000
+    buildTestData(num, List("stringField", "intField"))
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      sql("DROP TABLE IF EXISTS carbontable_varchar")
+      sql("DROP TABLE IF EXISTS carbontable_varchar2")
+      sql(s"CREATE TABLE carbontable_varchar USING CARBON LOCATION '$writerOutputPath'")
+      val e = intercept[Exception] {
+        sql("SELECT COUNT(*) FROM carbontable_varchar").show()
+      }
+      assert(e.getMessage.contains("Datatype of the Column VARCHAR present in index file, is varchar and not same as datatype of the column with same name present in table, because carbon convert varchar of carbon to string of spark, please set long_string_columns for varchar column"))
+
+      sql(s"CREATE TABLE carbontable_varchar2 USING CARBON OPTIONS('long_String_columns'='varcharField,varcharField2') LOCATION '$writerOutputPath'")
+      checkAnswer(sql("SELECT COUNT(*) FROM carbontable_varchar2"), Seq(Row(num)))
+    }
+  }
+
   test("test write using dataframe") {
     import spark.implicits._
     val df = spark.sparkContext.parallelize(1 to 10)
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index ab375f8..99f486d 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.sdk.file;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -80,6 +82,24 @@ public class Field {
       this.type = DataTypes.DOUBLE;
     } else if (type.equalsIgnoreCase("binary")) {
       this.type = DataTypes.BINARY;
+    } else if (type.toLowerCase().startsWith("decimal")) {
+      if ("decimal".equalsIgnoreCase(type.toLowerCase())) {
+        this.type = DataTypes.createDefaultDecimalType();
+      } else {
+        try {
+          Matcher m = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(type.toLowerCase());
+          m.find();
+          String matchedString = m.group(1);
+          String[] scaleAndPrecision = matchedString.split(",");
+          precision = Integer.parseInt(scaleAndPrecision[0].trim());
+          scale = Integer.parseInt(scaleAndPrecision[1].trim());
+          this.type = DataTypes.createDecimalType(precision, scale);
+        } catch (Exception e) {
+          throw new IllegalArgumentException("unsupported data type: " + type
+              + ". Please use decimal or decimal(precision,scale), " +
+              "precision can be 10 and scale can be 2", e);
+        }
+      }
     } else if (type.equalsIgnoreCase("array")) {
       this.type = DataTypes.createDefaultArrayType();
     } else if (type.equalsIgnoreCase("struct")) {
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 27b4e3a..1452058 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -107,6 +108,71 @@ public class CSVCarbonWriterTest {
   }
 
   @Test
+  public void testWriteJsonSchemaWithDefaultDecimal() {
+    String jsonSchema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"},\n")
+        .append("   {\"decimalField\":\"decimal\"}\n")
+        .append("]")
+        .toString();
+    Schema schema = Schema.parseJson(jsonSchema);
+    assert (10 == ((DecimalType) schema.getFields()[3].getDataType()).getPrecision());
+    assert (2 == ((DecimalType) schema.getFields()[3].getDataType()).getScale());
+  }
+
+  @Test
+  public void testWriteJsonSchemaWithCustomDecimal() {
+    String jsonSchema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"},\n")
+        .append("   {\"decimalField\":\"decimal(17,3)\"}\n")
+        .append("]")
+        .toString();
+    Schema schema = Schema.parseJson(jsonSchema);
+    assert (17 == ((DecimalType) schema.getFields()[3].getDataType()).getPrecision());
+    assert (3 == ((DecimalType) schema.getFields()[3].getDataType()).getScale());
+  }
+
+  @Test
+  public void testWriteJsonSchemaWithCustomDecimalAndSpace() {
+    String jsonSchema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"},\n")
+        .append("   {\"decimalField\":\"decimal( 17, 3)\"}\n")
+        .append("]")
+        .toString();
+    Schema schema = Schema.parseJson(jsonSchema);
+    assert (17 == ((DecimalType) schema.getFields()[3].getDataType()).getPrecision());
+    assert (3 == ((DecimalType) schema.getFields()[3].getDataType()).getScale());
+  }
+
+  @Test
+  public void testWriteJsonSchemaWithImproperDecimal() {
+    String jsonSchema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"},\n")
+        .append("   {\"decimalField\":\"decimal( 17, )\"}\n")
+        .append("]")
+        .toString();
+    try {
+      Schema.parseJson(jsonSchema);
+      assert (false);
+    } catch (Exception e) {
+      assert (e.getMessage().contains("unsupported data type: decimal( 17, ). " +
+          "Please use decimal or decimal(precision,scale), " +
+          "precision can be 10 and scale can be 2"));
+    }
+  }
+
+  @Test
   public void testWriteFilesBuildWithJsonSchema() throws IOException, InvalidLoadOptionException, InterruptedException {
     String path = "./testWriteFilesJsonSchema";
     FileUtils.deleteDirectory(new File(path));