You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2019/12/29 05:38:37 UTC

[carbondata] branch master updated: [CARBONDATA-3618] Update query should throw exception if key has more than one value

This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 01d0614  [CARBONDATA-3618] Update query should throw exception if key has more than one value
01d0614 is described below

commit 01d06147d301dc9858278e07c66a040bee75992d
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Thu Dec 12 20:02:39 2019 +0800

    [CARBONDATA-3618] Update query should throw exception if key has more than one value
    
    problem: Update query should throw exception if key has more than one value
    
    cause : Currently update command is adding multiple entries of key instead of throwing exception when update result key has more than one value to replace.
    
    This closes #3509
---
 .../core/constants/CarbonCommonConstants.java      | 17 +++++++++++
 .../carbondata/core/util/CarbonProperties.java     | 17 +++++++++++
 docs/configuration-parameters.md                   |  1 +
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 33 ++++++++++++++++++++++
 .../mutation/CarbonProjectForUpdateCommand.scala   | 20 ++++++++++++-
 5 files changed, 87 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1424e1a..0eed0fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -890,6 +890,23 @@ public final class CarbonCommonConstants {
   public static final String CARBON_HORIZONTAL_COMPACTION_ENABLE_DEFAULT = "true";
 
   /**
+   * For validating the key to value mapping in case of update.
+   * Update operation should throw exception if one key has more than one value to update.
+   * This validation might have slight degrade in performance of update query.
+   * If user knows that key value mapping is correct.
+   * can disable this validation for better update performance.
+   */
+  @CarbonProperty
+  public static final String CARBON_UPDATE_CHECK_UNIQUE_VALUE =
+      "carbon.update.check.unique.value";
+
+  /**
+   * Default validation of unique value check enabled for the update.
+   */
+  public static final String CARBON_UPDATE_CHECK_UNIQUE_VALUE_DEFAULT = "true";
+
+
+  /**
    * Which storage level to persist dataset when updating data
    * with 'carbon.update.persist.enable'='true'
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 35f11f3..8e5e9f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1802,6 +1802,23 @@ public final class CarbonProperties {
   }
 
   /**
+   * Validate and get unique value check enabled
+   *
+   * @return boolean
+   */
+  public static Boolean isUniqueValueCheckEnabled() {
+    String needValidate = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_UPDATE_CHECK_UNIQUE_VALUE);
+    if (needValidate == null) {
+      return Boolean
+          .parseBoolean(CarbonCommonConstants.CARBON_UPDATE_CHECK_UNIQUE_VALUE_DEFAULT);
+    } else {
+      // return false only if false is set. any other case return true
+      return !needValidate.equalsIgnoreCase("false");
+    }
+  }
+
+  /**
    * get local dictionary size threshold in mb.
    */
   private void validateAndGetLocalDictionarySizeThresholdInMB() {
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 5df3c09..16fdf68 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -153,6 +153,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.insert.storage.level | MEMORY_AND_DISK | Storage level to persist dataset of a RDD/dataframe. Applicable when ***carbon.insert.persist.enable*** is **true**, if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |
 | carbon.update.persist.enable | true | Configuration to enable the dataset of RDD/dataframe to persist data. Enabling this will reduce the execution time of UPDATE operation. |
 | carbon.update.storage.level | MEMORY_AND_DISK | Storage level to persist dataset of a RDD/dataframe. Applicable when ***carbon.update.persist.enable*** is **true**, if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |
+| carbon.update.check.unique.value | true | By default this property is true, so update will validate key value mapping. This validation might have slight degrade in performance of update query. If user knows that key value mapping is correct, can disable this validation for better update performance by setting this property to false. |
 
 
 ##  Dynamic Configuration In CarbonData Using SET-RESET
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 0e7f41a..fedfa70 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -96,6 +96,39 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("""drop table if exists iud.dest22""")
   }
 
+  test("update with subquery with more than one value for key") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'aa'")
+    sql("insert into t1 select 2, 'aa'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 1, 'Andy'")
+    sql("insert into t2 select 2, 'Andy'")
+    sql("insert into t2 select 1, 'aa'")
+    sql("insert into t2 select 3, 'aa'")
+    val exception = intercept[RuntimeException] {
+      sql("update t1 set (age) = (select t2.age from t2 where t2.name = 'Andy') where t1.age = 1 ").show(false)
+    }
+    assertResult(
+      "Update operation failed.  update cannot be supported for 1 to N mapping, as more than one " +
+      "value present for the update key")(exception.getMessage)
+    // Test carbon property
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_UPDATE_CHECK_UNIQUE_VALUE, "false")
+    // update  should not throw exception
+    sql("update t1 set (age) = (select t2.age from t2 where t2.name = 'Andy') where t1.age = 1 ").show(false)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_UPDATE_CHECK_UNIQUE_VALUE, "true")
+    // test join scenario
+    val exception1 = intercept[RuntimeException] {
+      sql("update t1 set (age) = (select t2.age from t2 where t2.name = t1.name) ").show(false)
+    }
+    assertResult(
+      "Update operation failed.  update cannot be supported for 1 to N mapping, as more than one " +
+      "value present for the update key")(exception1.getMessage)
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
   test("update carbon table without alias in set columns") {
     sql("""drop table if exists iud.dest33""")
     sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index a187b6b..9e2627f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.strategy.MixedFormatHandler
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{ArrayType, LongType}
 import org.apache.spark.storage.StorageLevel
 
@@ -135,7 +136,24 @@ private[sql] case class CarbonProjectForUpdateCommand(
           else {
             Dataset.ofRows(sparkSession, plan)
           }
-
+          if (CarbonProperties.isUniqueValueCheckEnabled) {
+            // If more than one value present for the update key, should fail the update
+            val ds = dataSet.select(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+              .groupBy(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+              .count()
+              .select("count")
+              .filter(col("count") > lit(1))
+              .limit(1)
+              .collect()
+            // tupleId represents the source rows that are going to get replaced.
+            // If same tupleId appeared more than once means key has more than one value to replace.
+            // which is undefined behavior.
+            if (ds.length > 0 && ds(0).getLong(0) > 1) {
+              throw new UnsupportedOperationException(
+                " update cannot be supported for 1 to N mapping, as more than one value present " +
+                "for the update key")
+            }
+          }
           // handle the clean up of IUD.
           CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)