You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/04/09 07:13:03 UTC

[carbondata] branch master updated: [CARBONDATA-3763] Fix wrong insert result during insert stage command

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f6dff4  [CARBONDATA-3763] Fix wrong insert result during insert stage command
0f6dff4 is described below

commit 0f6dff4a712714d8b15f7e9a75cd7e87e34c9d5e
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Fri Apr 3 20:21:52 2020 +0530

    [CARBONDATA-3763] Fix wrong insert result during insert stage command
    
    Why is this PR needed?
    For insertStageCommand, spark is reusing the internalRow as two times we transform from RDD[InternalRow] -> dataframe -> logical Plan -> RDD[InternalRow]. So, same data is inserted on other rows
    
    What changes were proposed in this PR?
    Copy the internalRow after the last transform.
    
    This closes #3694
---
 .../carbon/flink/TestCarbonPartitionWriter.scala   |  3 +++
 .../spark/rdd/CarbonDataRDDFactory.scala           |  5 +++-
 .../management/CarbonInsertFromStageCommand.scala  |  7 +++--
 .../command/management/CommonLoadUtils.scala       | 30 +++++++++++++++++-----
 .../constants/DataLoadProcessorConstants.java      |  3 +++
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 6ca877c..73284ff 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -194,6 +194,9 @@ class TestCarbonPartitionWriter extends QueryTest {
 
       checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
 
+      checkAnswer(sql(s"SELECT stringField FROM $tableName order by stringField limit 2"),
+        Seq(Row("test0"), Row("test1")))
+
       val rows = sql(s"SELECT * FROM $tableName limit 1").collect()
       assertResult(1)(rows.length)
       assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index d57546c..8709d11 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -375,7 +375,10 @@ object CarbonDataRDDFactory {
               .getFactTable
               .getListOfColumns
               .asScala.filterNot(col => col.isInvisible || col.getColumnName.contains("."))
-            val convertedRdd = CommonLoadUtils.getConvertedInternalRow(colSchema, scanResultRdd.get)
+            val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
+              colSchema,
+              scanResultRdd.get,
+              isInsertFromStageCommand = false)
             if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
               DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
                 .sparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index dd1efe8..c323f10 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusMan
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
@@ -352,8 +353,10 @@ case class CarbonInsertFromStageCommand(
         CarbonInsertIntoCommand(
           databaseNameOp = Option(table.getDatabaseName),
           tableName = table.getTableName,
-          options = scala.collection.immutable.Map("fileheader" -> header,
-            "binary_decoder" -> "base64"),
+          options = scala.collection.immutable.Map(
+            "fileheader" -> header,
+            "binary_decoder" -> "base64",
+            DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND -> "true"),
           isOverwriteTable = false,
           logicalPlan = selectedDataFrame.queryExecution.analyzed,
           tableInfo = table.getTableInfo,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 20222fe..a7cc48a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -551,7 +551,8 @@ object CommonLoadUtils {
       curAttributes: Seq[AttributeReference],
       sortScope: SortScopeOptions.SortScope,
       table: CarbonTable,
-      partition: Map[String, Option[String]]): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+      partition: Map[String, Option[String]],
+      isInsertFromStageCommand: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
     // keep partition column to end if exists
     var colSchema = table.getTableInfo
       .getFactTable
@@ -571,7 +572,10 @@ object CommonLoadUtils {
       colSchema = colSchema.filterNot(x => x.isInvisible || x.getColumnName.contains(".") ||
                                            x.getSchemaOrdinal == -1)
     }
-    val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(colSchema, rdd)
+    val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(
+      colSchema,
+      rdd,
+      isInsertFromStageCommand)
     transformQuery(updatedRdd,
       sparkSession,
       loadModel,
@@ -722,8 +726,10 @@ object CommonLoadUtils {
     }
   }
 
-  def getConvertedInternalRow(columnSchema: Seq[ColumnSchema],
-      rdd: RDD[InternalRow]): RDD[InternalRow] = {
+  def getConvertedInternalRow(
+      columnSchema: Seq[ColumnSchema],
+      rdd: RDD[InternalRow],
+      isInsertFromStageCommand: Boolean): RDD[InternalRow] = {
     // Converts the data as per the loading steps before give it to writer or sorter
     var timeStampIndex = scala.collection.mutable.Set[Int]()
     var dateIndex = scala.collection.mutable.Set[Int]()
@@ -740,7 +746,17 @@ object CommonLoadUtils {
       }
       i = i + 1
     }
-    val updatedRdd: RDD[InternalRow] = rdd.map { internalRow =>
+    val updatedRdd: RDD[InternalRow] = rdd.map { internalRowOriginal =>
+      val internalRow = if (isInsertFromStageCommand) {
+        // Insert stage command, logical plan already consist of LogicalRDD of internalRow.
+        // When it is converted to DataFrame, spark is reusing the same internalRow.
+        // So, need to have a copy before the last transformation.
+        // TODO: Even though copying internalRow is faster, we should avoid it
+        //  by finding a better way
+        internalRowOriginal.copy()
+      } else {
+        internalRowOriginal
+      }
       for (index <- timeStampIndex) {
         if (internalRow.getLong(index) == 0) {
           internalRow.setNullAt(index)
@@ -961,7 +977,9 @@ object CommonLoadUtils {
               attributes,
               sortScope,
               table,
-              loadParams.finalPartition)
+              loadParams.finalPartition,
+              loadParams.optionsOriginal
+                .contains(DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND))
           partitionsLen = partitions
           persistedRDD = persistedRDDLocal
           transformedPlan
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
index c7ef81b..c3cf1a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -40,4 +40,7 @@ public final class DataLoadProcessorConstants {
 
   // to indicate that it is optimized insert flow without rearrange of each data rows
   public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS";
+
+  // to indicate CarbonInsertFromStageCommand flow
+  public static final String IS_INSERT_STAGE_COMMAND = "IS_INSERT_STAGE_COMMAND";
 }