You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/08 05:46:16 UTC

carbondata git commit: [CARBONDATA-2879] [CARBONDATA-2918] support sort scope for sdk and make sort column case insensitive

Repository: carbondata
Updated Branches:
  refs/heads/master 6e50c1c6f -> f5c7a19b8


[CARBONDATA-2879] [CARBONDATA-2918] support sort scope for sdk and make sort column case insensitive

problem:: [CARBONDATA-2879] SDK doesn't support batch sort and no sort as we don't have any API support for sort scope

Solution: provide sort_scope in existing load_options, so user can decide which sort_scope.
currently supported batch_sort, local_sort and no_sort.
global sort is not applicable in sdk scenario. Hence not supported.
Updated the method header and doc also

problem: [CARBONDATA-2918]
Currently in CarbonWriterBuilder,
sortColumnsList.indexOf(field.getFieldName()) return null due to case issue. so, one of the sortcolumns in array will be null. so one of the column schema becomes null,
hence when column schema is accessed, we get NPE

solution: Change pareant column (as sort column is supported only foro parent columns) name to lower case when schema received by the user
and change sort columns also to lower case when received by the user.

This closes #2692


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f5c7a19b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f5c7a19b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f5c7a19b

Branch: refs/heads/master
Commit: f5c7a19b86e698f00e53390995b28113c787bc5a
Parents: 6e50c1c
Author: ajantha-bhat <aj...@gmail.com>
Authored: Tue Sep 4 14:55:47 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Sep 8 11:16:00 2018 +0530

----------------------------------------------------------------------
 docs/sdk-guide.md                               | 26 +++----
 .../TestNonTransactionalCarbonTable.scala       | 54 +++++++++++++--
 .../loading/model/CarbonLoadModelBuilder.java   |  2 +
 .../loading/parser/impl/JsonRowParser.java      |  5 +-
 .../sdk/file/CarbonWriterBuilder.java           | 71 ++++++++++++++------
 .../org/apache/carbondata/sdk/file/Field.java   |  5 ++
 6 files changed, 121 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5c7a19b/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index d786406..dd28e51 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -339,6 +339,7 @@ public CarbonWriterBuilder taskNo(long taskNo);
 *                g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
 *                h. quotechar
 *                i. escapechar
+*                j. sort_scope -- "local_sort", "no_sort", "batch_sort"
 *
 *                Default values are as follows.
 *
@@ -351,6 +352,7 @@ public CarbonWriterBuilder taskNo(long taskNo);
 *                g. complex_delimiter_level_2 -- ":"
 *                h. quotechar -- "\""
 *                i. escapechar -- "\\"
+*                j. sort_scope -- "local_sort"
 *
 * @return updated CarbonWriterBuilder
 */
@@ -359,18 +361,18 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
 
 ```
 /**
- * To support the table properties for sdk writer
- *
- * @param options key,value pair of create table properties.
- * supported keys values are
- * a. blocksize -- [1-2048] values in MB. Default value is 1024
- * b. blockletsize -- values in MB. Default value is 64 MB
- * c. localDictionaryThreshold -- positive value, default is 10000
- * d. enableLocalDictionary -- true / false. Default is false
- * e. sortcolumns -- comma separated column. "c1,c2". Default all dimensions are sorted.
- *
- * @return updated CarbonWriterBuilder
- */
+* To support the table properties for sdk writer
+*
+* @param options key,value pair of create table properties.
+* supported keys values are
+* a. table_blocksize -- [1-2048] values in MB. Default value is 1024
+* b. table_blocklet_size -- values in MB. Default value is 64 MB
+* c. local_dictionary_threshold -- positive value, default is 10000
+* d. local_dictionary_enable -- true / false. Default is false
+* e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
+*
+* @return updated CarbonWriterBuilder
+*/
 public CarbonWriterBuilder withTableProperties(Map<String, String> options);
 ```
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5c7a19b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 113066a..29ea755 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -126,7 +126,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       sortColumns: List[String]): Any = {
     val schema = new StringBuilder()
       .append("[ \n")
-      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"NaMe\":\"string\"},\n")
       .append("   {\"age\":\"int\"},\n")
       .append("   {\"height\":\"double\"}\n")
       .append("]")
@@ -350,7 +350,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
   test(" test csv fileheader for transactional table") {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildTestDataWithSameUUID(3, false, null, List("name"))
+    buildTestDataWithSameUUID(3, false, null, List("Name"))
     assert(new File(writerPath).exists())
 
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -379,7 +379,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
   test("test count star with multiple loads files with same schema and UUID") {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildTestDataWithSameUUID(3, false, null, List("name"))
+    buildTestDataWithSameUUID(3, false, null, List("namE"))
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql(
@@ -2356,6 +2356,46 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
   }
 
+  test("test Sort Scope with SDK") {
+    cleanTestData()
+    // test with no_sort
+    val options = Map("sort_scope" -> "no_sort").asJava
+    val fields: Array[Field] = new Array[Field](4)
+    fields(0) = new Field("stringField", DataTypes.STRING)
+    fields(1) = new Field("intField", DataTypes.INT)
+    val writer: CarbonWriter = CarbonWriter.builder
+      .outputPath(writerPath)
+      .withLoadOptions(options)
+      .buildWriterForCSVInput(new Schema(fields))
+    writer.write(Array("carbon", "1"))
+    writer.write(Array("hydrogen", "10"))
+    writer.write(Array("boron", "4"))
+    writer.write(Array("zirconium", "5"))
+    writer.close()
+
+    // read no sort data
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION '$writerPath' """
+        .stripMargin)
+    checkAnswer(sql("select * from sdkTable"),
+      Seq(Row("carbon", 1), Row("hydrogen", 10), Row("boron", 4), Row("zirconium", 5)))
+
+    // write local sort data
+    val writer1: CarbonWriter = CarbonWriter.builder
+      .outputPath(writerPath)
+      .buildWriterForCSVInput(new Schema(fields))
+    writer1.write(Array("carbon", "1"))
+    writer1.write(Array("hydrogen", "10"))
+    writer1.write(Array("boron", "4"))
+    writer1.write(Array("zirconium", "5"))
+    writer1.close()
+    // read both-no sort and local sort data
+    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(8)))
+    sql("DROP TABLE sdkTable")
+    cleanTestData()
+  }
+
   test("test LocalDictionary with True") {
     FileUtils.deleteDirectory(new File(writerPath))
     val builder = CarbonWriter.builder.isTransactionalTable(false)
@@ -2379,10 +2419,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   test("test LocalDictionary with custom Threshold") {
     FileUtils.deleteDirectory(new File(writerPath))
     val tablePropertiesMap: util.Map[String, String] =
-      Map("blocksize" -> "12",
-        "sortcolumns" -> "name",
-        "localDictionaryThreshold" -> "200",
-        "enableLocalDictionary" -> "true").asJava
+      Map("table_blocksize" -> "12",
+        "sort_columns" -> "name",
+        "local_dictionary_threshold" -> "200",
+        "local_dictionary_enable" -> "true").asJava
     val builder = CarbonWriter.builder.isTransactionalTable(false)
       .withTableProperties(tablePropertiesMap)
       .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5c7a19b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 6856d68..2ebcb29 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -74,6 +74,8 @@ public class CarbonLoadModelBuilder {
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
     }
     optionsFinal.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options, table));
+    optionsFinal.put("sort_scope",
+        Maps.getOrDefault(options, "sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT));
     CarbonLoadModel model = new CarbonLoadModel();
     model.setCarbonTransactionalTable(table.isTransactionalTable());
     model.setFactTimeStamp(UUID);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5c7a19b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
index c4f6074..119ae67 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -59,7 +60,9 @@ public class JsonRowParser implements RowParser {
       if (jsonNodeMap == null) {
         return null;
       }
-      return jsonToCarbonRecord(jsonNodeMap, dataFields);
+      Map<String, Object> jsonNodeMapCaseInsensitive = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+      jsonNodeMapCaseInsensitive.putAll(jsonNodeMap);
+      return jsonToCarbonRecord(jsonNodeMapCaseInsensitive, dataFields);
     } catch (IOException e) {
       throw new IOException("Failed to parse Json String: " + e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5c7a19b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index b38b491..76dd7aa 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
@@ -96,6 +97,11 @@ public class CarbonWriterBuilder {
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder sortBy(String[] sortColumns) {
+    if (sortColumns != null) {
+      for (int i = 0; i < sortColumns.length; i++) {
+        sortColumns[i] = sortColumns[i].toLowerCase();
+      }
+    }
     this.sortColumns = sortColumns;
     return this;
   }
@@ -233,6 +239,7 @@ public class CarbonWriterBuilder {
    * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
    * h. quotechar
    * i. escapechar
+   * j. sort_scope -- "local_sort", "no_sort", "batch_sort"
    *
    * Default values are as follows.
    *
@@ -245,17 +252,13 @@ public class CarbonWriterBuilder {
    * g. complex_delimiter_level_2 -- ":"
    * h. quotechar -- "\""
    * i. escapechar -- "\\"
+   * j. sort_scope -- "local_sort"
    *
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withLoadOptions(Map<String, String> options) {
     Objects.requireNonNull(options, "Load options should not be null");
     //validate the options.
-    if (options.size() > 9) {
-      throw new IllegalArgumentException("Supports only nine options now. "
-          + "Refer method header or documentation");
-    }
-
     for (String option: options.keySet()) {
       if (!option.equalsIgnoreCase("bad_records_logger_enable") &&
           !option.equalsIgnoreCase("bad_records_action") &&
@@ -265,9 +268,19 @@ public class CarbonWriterBuilder {
           !option.equalsIgnoreCase("complex_delimiter_level_1") &&
           !option.equalsIgnoreCase("complex_delimiter_level_2") &&
           !option.equalsIgnoreCase("quotechar") &&
-          !option.equalsIgnoreCase("escapechar")) {
-        throw new IllegalArgumentException("Unsupported options. "
-            + "Refer method header or documentation");
+          !option.equalsIgnoreCase("escapechar") &&
+          !option.equalsIgnoreCase("sort_scope")) {
+        throw new IllegalArgumentException("Unsupported option:" + option
+            + ". Refer method header or documentation");
+      }
+    }
+    // validate sort scope
+    String sortScope = options.get("sort_scope");
+    if (sortScope != null) {
+      if ((!CarbonUtil.isValidSortOption(sortScope))) {
+        throw new IllegalArgumentException("Invalid Sort Scope Option: " + sortScope);
+      } else if (sortScope.equalsIgnoreCase("global_sort")) {
+        throw new IllegalArgumentException("global sort is not supported");
       }
     }
 
@@ -283,11 +296,11 @@ public class CarbonWriterBuilder {
    *
    * @param options key,value pair of create table properties.
    * supported keys values are
-   * a. blocksize -- [1-2048] values in MB. Default value is 1024
-   * b. blockletsize -- values in MB. Default value is 64 MB
-   * c. localDictionaryThreshold -- positive value, default is 10000
-   * d. enableLocalDictionary -- true / false. Default is false
-   * e. sortcolumns -- comma separated column. "c1,c2". Default all dimensions are sorted.
+   * a. table_blocksize -- [1-2048] values in MB. Default value is 1024
+   * b. table_blocklet_size -- values in MB. Default value is 64 MB
+   * c. local_dictionary_threshold -- positive value, default is 10000
+   * d. local_dictionary_enable -- true / false. Default is false
+   * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted.
    *
    * @return updated CarbonWriterBuilder
    */
@@ -300,8 +313,8 @@ public class CarbonWriterBuilder {
     }
 
     Set<String> supportedOptions = new HashSet<>(Arrays
-        .asList("blocksize", "blockletsize", "localdictionarythreshold", "enablelocaldictionary",
-            "sortcolumns"));
+        .asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
+            "local_dictionary_enable", "sort_columns"));
 
     for (String key : options.keySet()) {
       if (!supportedOptions.contains(key.toLowerCase())) {
@@ -311,15 +324,15 @@ public class CarbonWriterBuilder {
     }
 
     for (Map.Entry<String, String> entry : options.entrySet()) {
-      if (entry.getKey().equalsIgnoreCase("equalsIgnoreCase")) {
+      if (entry.getKey().equalsIgnoreCase("table_blocksize")) {
         this.withBlockSize(Integer.parseInt(entry.getValue()));
-      } else if (entry.getKey().equalsIgnoreCase("blockletsize")) {
+      } else if (entry.getKey().equalsIgnoreCase("table_blocklet_size")) {
         this.withBlockletSize(Integer.parseInt(entry.getValue()));
-      } else if (entry.getKey().equalsIgnoreCase("localDictionaryThreshold")) {
+      } else if (entry.getKey().equalsIgnoreCase("local_dictionary_threshold")) {
         this.localDictionaryThreshold(Integer.parseInt(entry.getValue()));
-      } else if (entry.getKey().equalsIgnoreCase("enableLocalDictionary")) {
+      } else if (entry.getKey().equalsIgnoreCase("local_dictionary_enable")) {
         this.enableLocalDictionary((entry.getValue().equalsIgnoreCase("true")));
-      } else {
+      } else if (entry.getKey().equalsIgnoreCase("sort_columns")) {
         //sort columns
         String[] sortColumns = entry.getValue().split(",");
         this.sortBy(sortColumns);
@@ -534,14 +547,13 @@ public class CarbonWriterBuilder {
 
   public CarbonLoadModel buildLoadModel(Schema carbonSchema)
       throws IOException, InvalidLoadOptionException {
-    this.schema = carbonSchema;
+    this.schema = schemaFieldNameToLowerCase(carbonSchema);
     // build CarbonTable using schema
     CarbonTable table = buildCarbonTable();
     if (persistSchemaFile) {
       // we are still using the traditional carbon table folder structure
       persistSchemaFile(table, CarbonTablePath.getSchemaFilePath(path));
     }
-
     // build LoadModel
     return buildLoadModel(table, UUID, taskNo, options);
   }
@@ -722,4 +734,19 @@ public class CarbonWriterBuilder {
     setCsvHeader(build);
     return build;
   }
+
+  /* loop through all the parent column and change fields name lower case.
+  * this is to match with sort column case */
+  private Schema schemaFieldNameToLowerCase(Schema schema) {
+    if (schema == null) {
+      return null;
+    }
+    Field[] fields =  schema.getFields();
+    for (int i = 0; i < fields.length; i++) {
+      if (fields[i] != null) {
+        fields[i].updateNameToLowerCase();
+      }
+    }
+    return new Schema(fields);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5c7a19b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index b1a9e69..1c5ab52 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -213,4 +213,9 @@ public class Field {
   public void setColumnComment(String columnComment) {
     this.columnComment = columnComment;
   }
+
+  /*can use to change the case of the schema */
+  public void updateNameToLowerCase() {
+    this.name = name.toLowerCase();
+  }
 }