You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/24 08:34:10 UTC
carbondata git commit: [CARBONDATA-3184]Fix DataLoad Failure with
'using carbondata'
Repository: carbondata
Updated Branches:
refs/heads/master f4c1c672b -> 08553618d
[CARBONDATA-3184]Fix DataLoad Failure with 'using carbondata'
Problem
When the carbonsession is initialized with different storepath and metastorepath, creating a table through 'using carbondata' and loading the same using load ddl fails, because it tries to get schema from locationuri which contains warehouse path.
Solution:
Set location to tablepath if location and tablepath are different
This closes #2998
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/08553618
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/08553618
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/08553618
Branch: refs/heads/master
Commit: 08553618d575da062a2959424ce731e101899696
Parents: f4c1c67
Author: Indhumathi27 <in...@gmail.com>
Authored: Tue Dec 18 16:08:42 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Dec 24 14:03:56 2018 +0530
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/CarbonSource.scala | 3 ++-
.../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++
.../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++
.../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++
.../spark/util/AllDictionaryTestCase.scala | 16 ++++++++++++++--
5 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 9899e8b..7f72d42 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -331,7 +331,8 @@ object CarbonSource {
properties,
query)
// updating params
- val updatedFormat = storageFormat.copy(properties = map)
+ val updatedFormat = CarbonToSparkAdapater
+ .getUpdatedStorageFormat(storageFormat, map, tablePath)
tableDesc.copy(storage = updatedFormat)
} else {
val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
index d5fe6a4..52c27ee 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -18,8 +18,11 @@
package org.apache.spark.sql
+import java.net.URI
+
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
@@ -72,4 +75,10 @@ object CarbonToSparkAdapater {
ExpressionSet(filterPredicates)
.filter(_.references.subsetOf(partitionSet)))
}
+
+ def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+ map: Map[String, String],
+ tablePath: String): CatalogStorageFormat = {
+ storageFormat.copy(properties = map, locationUri = Some(tablePath))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
index 7a68e3e..244b097 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -18,8 +18,11 @@
package org.apache.spark.sql
+import java.net.URI
+
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
@@ -79,4 +82,10 @@ object CarbonToSparkAdapater {
def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
Seq(OptimizeCodegen(conf))
}
+
+ def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+ map: Map[String, String],
+ tablePath: String): CatalogStorageFormat = {
+ storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
index cec4c36..4c4ce4d 100644
--- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala
@@ -18,8 +18,11 @@
package org.apache.spark.sql
+import java.net.URI
+
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -81,4 +84,10 @@ object CarbonToSparkAdapater {
def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
Seq.empty
}
+
+ def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
+ map: Map[String, String],
+ tablePath: String): CatalogStorageFormat = {
+ storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/08553618/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index bceb0fc..58e5665 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.spark.util
import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -82,6 +82,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
override def beforeAll {
sql("drop table if exists sample")
sql("drop table if exists complextypes")
+ sql("drop table if exists tabletest")
buildTestData
// second time comment this line
buildTable
@@ -167,9 +168,20 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
DictionaryTestCaseUtil.
checkDictionary(complexRelation, "channelsId", "1650")
}
-
+
+ test("test create table thorugh 'using carbondata' and load data") {
+ sql(
+ "CREATE TABLE tabletest (empno INT, workgroupcategory STRING, deptno INT, projectcode INT, " +
+ "attendance INT) USING carbondata")
+ sql(
+ s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tabletest OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""".stripMargin)
+ checkAnswer(sql("select count(*) from tabletest"), Row(10))
+ }
+
override def afterAll {
sql("drop table sample")
sql("drop table complextypes")
+ sql("drop table tabletest")
}
}