You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2017/12/16 06:38:22 UTC

carbondata git commit: [CARBONDATA-1304] [IUD Bug] Iud with single pass

Repository: carbondata
Updated Branches:
  refs/heads/master eb7cf54ef -> 8bf72a6e0


[CARBONDATA-1304] [IUD Bug] Iud with single pass

The Update on carbon table is failing with single pass. In case of single pass the tupleId is not being arranged in the end.

In case of single pass the tupleId should be retrieved using SegIdUDF function and should be arranged in the end.

This closes #1167


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8bf72a6e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8bf72a6e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8bf72a6e

Branch: refs/heads/master
Commit: 8bf72a6e0767c1cd24924753ed21c90ae1932a40
Parents: eb7cf54
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Jul 11 22:20:05 2017 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Sat Dec 16 12:11:09 2017 +0530

----------------------------------------------------------------------
 .../iud/UpdateCarbonTableTestCase.scala         | 43 +++++++++++++++++-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 17 +++++++
 .../spark/sql/test/TestQueryExecutor.scala      |  2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +
 .../management/CarbonLoadDataCommand.scala      | 48 ++++++++++++--------
 5 files changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bf72a6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index d381a43..f265e75 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -18,7 +18,8 @@ package org.apache.carbondata.spark.testsuite.iud
 
 import org.apache.spark.sql.{Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -528,6 +529,46 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists preaggMain_preagg1")
   }
 
+  test("Update operation on carbon table with singlepass") {
+    sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
+         STORED BY 'org.apache.carbondata.format'""")
+    val testData = s"$resourcesPath/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
+    // update operation
+    sql("""update carbon.carbontable d  set (d.id) = (d.id + 1) where d.id > 2""").show()
+    checkAnswer(
+      sql("select count(*) from carbontable"),
+      Seq(Row(6))
+    )
+    sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""")
+    sql("drop table carbontable")
+  }
+  test("Update operation on carbon table with persist false") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isPersistEnabled, "false")
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
+         STORED BY 'org.apache.carbondata.format'""")
+    val testData = s"$resourcesPath/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
+    // update operation
+    sql("""update carbon.carbontable d  set (d.id) = (d.id + 1) where d.id > 2""").show()
+    checkAnswer(
+      sql("select count(*) from carbontable"),
+      Seq(Row(6))
+    )
+    sql("drop table carbontable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.isPersistEnabled,
+        CarbonCommonConstants.defaultValueIsPersistEnabled)
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bf72a6e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 4858dab..653e6f3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -222,4 +222,21 @@ object CarbonScalaUtil {
           } cannot be modified. Only Int and Decimal data types are allowed for modification")
     }
   }
+
+  /**
+   * returns  all fields except tupleId field as it is not required in the value
+   *
+   * @param fields
+   * @return
+   */
+  def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
+    // getting all fields except tupleId field as it is not required in the value
+    val otherFields = fields.toSeq
+      .filter(field => !field.name
+        .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+      .map(field => {
+        new Column(field.name)
+      })
+    otherFields
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bf72a6e/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index 9bde483..9e30b02 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -47,7 +47,7 @@ object TestQueryExecutor {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   val projectPath = new File(this.getClass.getResource("/").getPath + "../../../..")
-    .getCanonicalPath
+    .getCanonicalPath.replaceAll("\\\\", "/")
   LOGGER.info(s"project path: $projectPath")
   val integrationPath = s"$projectPath/integration"
   val metastoredb = s"$integrationPath/spark-common/target"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bf72a6e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7955e71..e7d10d9 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -421,6 +421,8 @@ object CarbonDataRDDFactory {
       } else {
         // in success case handle updation of the table status file.
         // success case.
+        // write the dictionary file in case of single_pass true
+        writeDictionary(carbonLoadModel, result, false)
         val segmentDetails = new util.HashSet[String]()
         var resultSize = 0
         res.foreach { resultOfSeg =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8bf72a6e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 38c34dd..0affe79 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.{CausedBy, FileUtils}
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -47,7 +48,7 @@ import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -310,6 +311,11 @@ case class CarbonLoadDataCommand(
     } else {
       None
     }
+    val loadDataFrame = if (updateModel.isDefined) {
+       Some(getDataFrameWithTupleID())
+    } else {
+      dataFrame
+    }
     CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
       carbonLoadModel,
       columnar,
@@ -317,7 +323,7 @@ case class CarbonLoadDataCommand(
       server,
       isOverwriteTable,
       hadoopConf,
-      dataFrame,
+      loadDataFrame,
       updateModel,
       operationContext)
   }
@@ -330,27 +336,11 @@ case class CarbonLoadDataCommand(
       hadoopConf: Configuration,
       operationContext: OperationContext): Unit = {
     val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-      val fields = dataFrame.get.schema.fields
-      import org.apache.spark.sql.functions.udf
-      // extracting only segment from tupleId
-      val getSegIdUDF = udf((tupleId: String) =>
-        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+      val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
       // getting all fields except tupleId field as it is not required in the value
-      var otherFields = fields.toSeq.filter { field =>
-        !field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
-      }.map { field =>
-        new Column(field.name)
-      }
-
-      // extract tupleId field which will be used as a key
-      val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-        .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
-        as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+      val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
       // use dataFrameWithoutTupleId as dictionaryDataFrame
       val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-      otherFields = otherFields :+ segIdColumn
-      // use dataFrameWithTupleId as loadDataFrame
-      val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
       (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
     } else {
       (dataFrame, dataFrame)
@@ -374,6 +364,24 @@ case class CarbonLoadDataCommand(
       operationContext)
   }
 
+  def getDataFrameWithTupleID(): DataFrame = {
+    val fields = dataFrame.get.schema.fields
+    import org.apache.spark.sql.functions.udf
+    // extracting only segment from tupleId
+    val getSegIdUDF = udf((tupleId: String) =>
+      CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+    // getting all fields except tupleId field as it is not required in the value
+    val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(fields)
+    // extract tupleId field which will be used as a key
+    val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+      .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
+      as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+    val fieldWithTupleId = otherFields :+ segIdColumn
+    // use dataFrameWithTupleId as loadDataFrame
+    val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
+    (dataFrameWithTupleId)
+  }
+
   private def updateTableMetadata(
       carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,