You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:46 UTC

[44/45] carbondata git commit: [CARBONDATA-2994] unify badrecordpath property name for create and load

[CARBONDATA-2994] unify badrecordpath property name for create and load

Problem:
Currently bad records path can be specified in create and load. In create the property name is bad_records_path and load is bad_record_path. This can cause confusion for the user.

Solution: Use bad_record_path as the property for create so that both load and create use the same name.

This closes #2799


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/19097f27
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/19097f27
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/19097f27

Branch: refs/heads/branch-1.5
Commit: 19097f272fe3227c71c86338bb8bf788e87cd4aa
Parents: fa08825
Author: kunal642 <ku...@gmail.com>
Authored: Fri Oct 5 14:57:26 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Oct 8 14:29:59 2018 +0530

----------------------------------------------------------------------
 docs/ddl-of-carbondata.md                                | 11 +++++++++++
 docs/dml-of-carbondata.md                                |  8 --------
 .../carbondata/hadoop/api/CarbonTableOutputFormat.java   |  2 +-
 .../StandardPartitionBadRecordLoggerTest.scala           |  2 +-
 .../org/apache/carbondata/spark/StreamingOption.scala    |  2 +-
 .../sql/execution/command/carbonTableSchemaCommon.scala  |  4 ++--
 .../command/table/CarbonDescribeFormattedCommand.scala   |  7 +++++++
 .../spark/carbondata/BadRecordPathLoadOptionTest.scala   |  4 ++--
 .../spark/carbondata/TestStreamingTableOperation.scala   |  8 ++++----
 .../carbondata/processing/util/CarbonBadRecordUtil.java  |  2 +-
 10 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/docs/ddl-of-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 22d754a..c1a891d 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -33,6 +33,7 @@ CarbonData DDL statements are documented here,which includes:
   * [Hive/Parquet folder Structure](#support-flat-folder-same-as-hiveparquet)
   * [Extra Long String columns](#string-longer-than-32000-characters)
   * [Compression for Table](#compression-for-table)
+  * [Bad Records Path](#bad-records-path)
 * [CREATE TABLE AS SELECT](#create-table-as-select)
 * [CREATE EXTERNAL TABLE](#create-external-table)
   * [External Table on Transactional table location](#create-external-table-on-managed-table-data-location)
@@ -454,6 +455,16 @@ CarbonData DDL statements are documented here,which includes:
      ```
      carbon.column.compressor=zstd
      ```
+     
+   - ##### Bad Records Path
+     This property is used to specify the location where bad records would be written.
+     As the table path remains the same after rename therefore the user can use this property to
+     specify bad records path for the table at the time of creation, so that the same path can 
+     be later viewed in table description for reference.
+     
+     ```
+       TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'')
+     ```
 
 ## CREATE TABLE AS SELECT
   This function allows user to create a Carbon table from any of the Parquet/Hive/Carbon table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive table and use the Carbon query engine to query and achieve better query results for cases where Carbon is faster than other file formats. Also this feature can be used for backing up the data.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/docs/dml-of-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index db7c118..393ebd3 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -240,14 +240,6 @@ CarbonData DML statements are documented here,which includes:
   * Since Bad Records Path can be specified in create, load and carbon properties. 
     Therefore, value specified in load will have the highest priority, and value specified in carbon properties will have the least priority.
 
-   **Bad Records Path:**
-         This property is used to specify the location where bad records would be written.
-        
-
-   ```
-   TBLPROPERTIES('BAD_RECORDS_PATH'='/opt/badrecords'')
-   ```
-
   Example:
 
   ```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 762983b..f0ad94d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -374,7 +374,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     String badRecordsPath = conf.get(BAD_RECORD_PATH);
     if (StringUtils.isEmpty(badRecordsPath)) {
       badRecordsPath =
-          carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
+          carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_record_path");
       if (StringUtils.isEmpty(badRecordsPath)) {
         badRecordsPath = carbonProperty
             .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index d9e5d3c..9689f3d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -44,7 +44,7 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter
   test("test partition redirect") {
     sql(
       s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORD_PATH'='$warehouse')""")
 
     val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
index 2402d83..087bef2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
@@ -56,7 +56,7 @@ class StreamingOption(val userInputMap: Map[String, String]) {
 
   lazy val badRecordsPath: String =
     userInputMap
-      .getOrElse("bad_records_path", CarbonProperties.getInstance()
+      .getOrElse("bad_record_path", CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
           CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index da22658..5e0fe8b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -844,7 +844,7 @@ class TableNewProcessor(cm: TableModel) {
       cm.tableName,
       tableSchema.getTableId,
       cm.databaseNameOp.getOrElse("default"))
-    tablePropertiesMap.put("bad_records_path", badRecordsPath)
+    tablePropertiesMap.put("bad_record_path", badRecordsPath)
     tableSchema.setTableProperties(tablePropertiesMap)
     if (cm.bucketFields.isDefined) {
       val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
@@ -898,7 +898,7 @@ class TableNewProcessor(cm: TableModel) {
       tableId: String,
       databaseName: String): String = {
     val badRecordsPath = tablePropertiesMap.asScala
-      .getOrElse("bad_records_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
+      .getOrElse("bad_record_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
     if (badRecordsPath == null || badRecordsPath.isEmpty) {
       CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 9b9e8bd..029c0e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -217,6 +218,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
     results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
       relation.carbonTable.getTableName).asScala
       .map(column => column).mkString(","), ""))
+
+    val bad_record_path = relation.carbonTable.getTableInfo.getFactTable
+      .getTableProperties.get("bad_record_path")
+    if (!StringUtils.isEmpty(bad_record_path)) {
+      results ++= Seq(("BAD_RECORD_PATH", bad_record_path, ""))
+    }
     // add columns configured in column meta cache
     if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) {
       results ++=

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
index dfefa9b..e3e261f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
@@ -44,7 +44,7 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll
   test("data load log file and csv file written at the configured location") {
     sql(
       s"""CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORD_PATH'='$warehouse')""")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
     val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
@@ -64,7 +64,7 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll
   def isFilesWrittenAtBadStoreLocation: Boolean = {
     val badStorePath =
       CarbonEnv.getCarbonTable(Some("default"), "salestest")(sqlContext.sparkSession).getTableInfo
-        .getFactTable.getTableProperties.get("bad_records_path") + "/0/0"
+        .getFactTable.getTableProperties.get("bad_record_path") + "/0/0"
     val carbonFile: CarbonFile = FileFactory
       .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath))
     var exists: Boolean = carbonFile.exists()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 607c429..62c0221 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1880,7 +1880,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         |  'interval'='1 seconds',
         |  'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
         |  'BAD_RECORDS_ACTION' = 'FORCE',
-        |  'BAD_RECORDS_PATH'='$warehouse')
+        |  'BAD_RECORD_PATH'='$warehouse')
         |AS
         |  SELECT *
         |  FROM source
@@ -1894,7 +1894,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         |  'interval'='1 seconds',
         |  'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
         |  'BAD_RECORDS_ACTION' = 'FORCE',
-        |  'BAD_RECORDS_PATH'='$warehouse')
+        |  'BAD_RECORD_PATH'='$warehouse')
         |AS
         |  SELECT *
         |  FROM source
@@ -2554,7 +2554,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
-         | 'sort_columns'='name', 'dictionary_include'='city,register', 'BAD_RECORDS_PATH'='$badRecordFilePath')
+         | 'sort_columns'='name', 'dictionary_include'='city,register', 'BAD_RECORD_PATH'='$badRecordFilePath')
          | """.stripMargin)
 
     if (withBatchLoad) {
@@ -2583,7 +2583,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
-         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'BAD_RECORDS_PATH'='$badRecordFilePath')
+         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'BAD_RECORD_PATH'='$badRecordFilePath')
          | """.stripMargin)
 
     if (withBatchLoad) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19097f27/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
index 55bc580..ecc6afb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
@@ -130,7 +130,7 @@ public class CarbonBadRecordUtil {
   public static String getBadRecordsPath(Map<String, String> loadOptions, CarbonTable table) {
     String badRecordsFromLoad = loadOptions.get("bad_record_path");
     String badRecordsFromCreate =
-        table.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
+        table.getTableInfo().getFactTable().getTableProperties().get("bad_record_path");
     String badRecordsPath;
     if (StringUtils.isNotEmpty(badRecordsFromLoad)) {
       badRecordsPath =