You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/24 08:34:10 UTC

carbondata git commit: [CARBONDATA-3184]Fix DataLoad Failure with 'using carbondata'

Repository: carbondata
Updated Branches:
  refs/heads/master f4c1c672b -> 08553618d


[CARBONDATA-3184]Fix DataLoad Failure with 'using carbondata'

Problem
When the carbonsession is initialized with different storepath and metastorepath, creating a table through 'using carbondata' and loading the same using load ddl fails, because it tries to get schema from locationuri which contains warehouse path.

Solution:
Set location to tablepath if location and tablepath are different

This closes #2998


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/08553618
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/08553618
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/08553618

Branch: refs/heads/master
Commit: 08553618d575da062a2959424ce731e101899696
Parents: f4c1c67
Author: Indhumathi27 <in...@gmail.com>
Authored: Tue Dec 18 16:08:42 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Dec 24 14:03:56 2018 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonSource.scala   |  3 ++-
 .../apache/spark/sql/CarbonToSparkAdapater.scala    |  9 +++++++++
 .../apache/spark/sql/CarbonToSparkAdapater.scala    |  9 +++++++++
 .../apache/spark/sql/CarbonToSparkAdapater.scala    |  9 +++++++++
 .../spark/util/AllDictionaryTestCase.scala          | 16 ++++++++++++++--
 5 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 9899e8b..7f72d42 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -331,7 +331,8 @@ object CarbonSource {
         properties,
         query)
       // updating params
-      val updatedFormat = storageFormat.copy(properties = map)
+      val updatedFormat = CarbonToSparkAdapater
+        .getUpdatedStorageFormat(storageFormat, map, tablePath)
       tableDesc.copy(storage = updatedFormat)
     } else {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
index d5fe6a4..52c27ee 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -18,8 +18,11 @@
 
 package org.apache.spark.sql
 
+import java.net.URI
+
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
@@ -72,4 +75,10 @@ object CarbonToSparkAdapater {
       ExpressionSet(filterPredicates)
         .filter(_.references.subsetOf(partitionSet)))
   }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(tablePath))
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 7a68e3e..244b097 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -18,8 +18,11 @@
 
 package org.apache.spark.sql
 
+import java.net.URI
+
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
@@ -79,4 +82,10 @@ object CarbonToSparkAdapater {
   def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
     Seq(OptimizeCodegen(conf))
   }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index cec4c36..4c4ce4d 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -18,8 +18,11 @@
 
 package org.apache.spark.sql
 
+import java.net.URI
+
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -81,4 +84,10 @@ object CarbonToSparkAdapater {
   def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
     Seq.empty
   }
+
+  def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+      map: Map[String, String],
+      tablePath: String): CatalogStorageFormat = {
+    storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index bceb0fc..58e5665 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.spark.util
 
 import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -82,6 +82,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   override def beforeAll {
     sql("drop table if exists sample")
     sql("drop table if exists complextypes")
+    sql("drop table if exists tabletest")
     buildTestData
     // second time comment this line
     buildTable
@@ -167,9 +168,20 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     DictionaryTestCaseUtil.
       checkDictionary(complexRelation, "channelsId", "1650")
   }
-  
+
+  test("test create table thorugh 'using carbondata' and load data") {
+    sql(
+      "CREATE TABLE tabletest (empno INT, workgroupcategory STRING, deptno INT, projectcode INT, " +
+      "attendance INT) USING carbondata")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tabletest OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""".stripMargin)
+    checkAnswer(sql("select count(*) from tabletest"), Row(10))
+  }
+
   override def afterAll {
     sql("drop table sample")
     sql("drop table complextypes")
+    sql("drop table tabletest")
   }
 }