You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/04/09 07:13:03 UTC
[carbondata] branch master updated: [CARBONDATA-3763] Fix wrong
insert result during insert stage command
This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0f6dff4 [CARBONDATA-3763] Fix wrong insert result during insert stage command
0f6dff4 is described below
commit 0f6dff4a712714d8b15f7e9a75cd7e87e34c9d5e
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Fri Apr 3 20:21:52 2020 +0530
[CARBONDATA-3763] Fix wrong insert result during insert stage command
Why is this PR needed?
For insertStageCommand, spark is reusing the internalRow as two times we transform from RDD[InternalRow] -> dataframe -> logical Plan -> RDD[InternalRow]. So, same data is inserted on other rows
What changes were proposed in this PR?
Copy the internalRow after the last transform.
This closes #3694
---
.../carbon/flink/TestCarbonPartitionWriter.scala | 3 +++
.../spark/rdd/CarbonDataRDDFactory.scala | 5 +++-
.../management/CarbonInsertFromStageCommand.scala | 7 +++--
.../command/management/CommonLoadUtils.scala | 30 +++++++++++++++++-----
.../constants/DataLoadProcessorConstants.java | 3 +++
5 files changed, 39 insertions(+), 9 deletions(-)
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 6ca877c..73284ff 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -194,6 +194,9 @@ class TestCarbonPartitionWriter extends QueryTest {
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+ checkAnswer(sql(s"SELECT stringField FROM $tableName order by stringField limit 2"),
+ Seq(Row("test0"), Row("test1")))
+
val rows = sql(s"SELECT * FROM $tableName limit 1").collect()
assertResult(1)(rows.length)
assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index d57546c..8709d11 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -375,7 +375,10 @@ object CarbonDataRDDFactory {
.getFactTable
.getListOfColumns
.asScala.filterNot(col => col.isInvisible || col.getColumnName.contains("."))
- val convertedRdd = CommonLoadUtils.getConvertedInternalRow(colSchema, scanResultRdd.get)
+ val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
+ colSchema,
+ scanResultRdd.get,
+ isInsertFromStageCommand = false)
if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
.sparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index dd1efe8..c323f10 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusMan
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
@@ -352,8 +353,10 @@ case class CarbonInsertFromStageCommand(
CarbonInsertIntoCommand(
databaseNameOp = Option(table.getDatabaseName),
tableName = table.getTableName,
- options = scala.collection.immutable.Map("fileheader" -> header,
- "binary_decoder" -> "base64"),
+ options = scala.collection.immutable.Map(
+ "fileheader" -> header,
+ "binary_decoder" -> "base64",
+ DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND -> "true"),
isOverwriteTable = false,
logicalPlan = selectedDataFrame.queryExecution.analyzed,
tableInfo = table.getTableInfo,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 20222fe..a7cc48a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -551,7 +551,8 @@ object CommonLoadUtils {
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
table: CarbonTable,
- partition: Map[String, Option[String]]): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+ partition: Map[String, Option[String]],
+ isInsertFromStageCommand: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
// keep partition column to end if exists
var colSchema = table.getTableInfo
.getFactTable
@@ -571,7 +572,10 @@ object CommonLoadUtils {
colSchema = colSchema.filterNot(x => x.isInvisible || x.getColumnName.contains(".") ||
x.getSchemaOrdinal == -1)
}
- val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(colSchema, rdd)
+ val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(
+ colSchema,
+ rdd,
+ isInsertFromStageCommand)
transformQuery(updatedRdd,
sparkSession,
loadModel,
@@ -722,8 +726,10 @@ object CommonLoadUtils {
}
}
- def getConvertedInternalRow(columnSchema: Seq[ColumnSchema],
- rdd: RDD[InternalRow]): RDD[InternalRow] = {
+ def getConvertedInternalRow(
+ columnSchema: Seq[ColumnSchema],
+ rdd: RDD[InternalRow],
+ isInsertFromStageCommand: Boolean): RDD[InternalRow] = {
// Converts the data as per the loading steps before give it to writer or sorter
var timeStampIndex = scala.collection.mutable.Set[Int]()
var dateIndex = scala.collection.mutable.Set[Int]()
@@ -740,7 +746,17 @@ object CommonLoadUtils {
}
i = i + 1
}
- val updatedRdd: RDD[InternalRow] = rdd.map { internalRow =>
+ val updatedRdd: RDD[InternalRow] = rdd.map { internalRowOriginal =>
+ val internalRow = if (isInsertFromStageCommand) {
+ // Insert stage command, logical plan already consist of LogicalRDD of internalRow.
+ // When it is converted to DataFrame, spark is reusing the same internalRow.
+ // So, need to have a copy before the last transformation.
+ // TODO: Even though copying internalRow is faster, we should avoid it
+ // by finding a better way
+ internalRowOriginal.copy()
+ } else {
+ internalRowOriginal
+ }
for (index <- timeStampIndex) {
if (internalRow.getLong(index) == 0) {
internalRow.setNullAt(index)
@@ -961,7 +977,9 @@ object CommonLoadUtils {
attributes,
sortScope,
table,
- loadParams.finalPartition)
+ loadParams.finalPartition,
+ loadParams.optionsOriginal
+ .contains(DataLoadProcessorConstants.IS_INSERT_STAGE_COMMAND))
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
index c7ef81b..c3cf1a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -40,4 +40,7 @@ public final class DataLoadProcessorConstants {
// to indicate that it is optimized insert flow without rearrange of each data rows
public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS";
+
+ // to indicate CarbonInsertFromStageCommand flow
+ public static final String IS_INSERT_STAGE_COMMAND = "IS_INSERT_STAGE_COMMAND";
}