You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/06 04:49:19 UTC
[1/2] incubator-carbondata git commit: Insert Select Into Same Table
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 950a6d0f5 -> ada081d89
Insert Select Into Same Table
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f527d3d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f527d3d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f527d3d2
Branch: refs/heads/master
Commit: f527d3d2e460076705834393182199c589ed318e
Parents: 950a6d0
Author: sounakr <so...@gmail.com>
Authored: Mon Apr 3 12:48:20 2017 +0530
Committer: sounakr <so...@gmail.com>
Committed: Wed Apr 5 23:52:20 2017 +0530
----------------------------------------------------------------------
.../InsertIntoCarbonTableTestCase.scala | 28 +++++++++
.../spark/sql/hive/CarbonAnalysisRules.scala | 43 +++++++-------
.../spark/sql/CarbonCatalystOperators.scala | 19 +++++++
.../sql/CarbonDatasourceHadoopRelation.scala | 13 +----
.../sql/execution/command/DDLStrategy.scala | 5 +-
.../execution/command/carbonTableSchema.scala | 3 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 60 ++++++++++++++++++++
.../spark/sql/hive/CarbonSessionState.scala | 30 +++++++++-
8 files changed, 163 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index e84e62a..0b491bf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -210,6 +210,32 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
)
}
+ test("insert select from same table") {
+ val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+ sql("drop table if exists CarbonDest")
+ sql("create table CarbonDest (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersio
ns string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
+
+ sql("drop table if exists HiveDest")
+ sql("create table HiveDest (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions
string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+
+ sql("insert into CarbonDest select * from THive")
+ sql("insert into CarbonDest select * from CarbonDest")
+
+ sql("insert into HiveDest select * from THive")
+ sql("insert into HiveDest select * from HiveDest")
+
+ checkAnswer(
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operato
rsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from HiveDest order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Late
st_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+ sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operato
rsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from CarbonDest order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+ )
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig)
+ }
+
+
+
override def afterAll {
sql("drop table if exists load")
sql("drop table if exists inser")
@@ -219,5 +245,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists TCarbonSource")
sql("drop table if exists loadtable")
sql("drop table if exists insertTable")
+ sql("drop table if exists CarbonDest")
+ sql("drop table if exists HiveDest")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index f22e958..d23b18f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -33,31 +33,32 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
* Insert into carbon table from other source
*/
object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
- // Wait until children are resolved.
- case p: LogicalPlan if !p.childrenResolved => p
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
+ // Wait until children are resolved.
+ case p: LogicalPlan if !p.childrenResolved => p
- case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
- if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
- castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
- }
+ case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+ if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
+ castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
+ }
- def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
- : LogicalPlan = {
- if (relation.carbonRelation.output.size > CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
- sys
- .error("Maximum supported column by carbon is:" + CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS
- )
- }
- if (child.output.size >= relation.carbonRelation.output.size ) {
- InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
- } else {
- sys.error("Cannot insert into target table because column number are different")
- }
+ def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
+ : LogicalPlan = {
+ if (relation.carbonRelation.output.size > CarbonCommonConstants
+ .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+ sys
+ .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+ .DEFAULT_MAX_NUMBER_OF_COLUMNS
+ )
+ }
+ if (child.output.size >= relation.carbonRelation.output.size ) {
+ InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
+ } else {
+ sys.error("Cannot insert into target table because column number are different")
}
}
+}
+
object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index d94489b..4070088 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -75,3 +75,22 @@ case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
override def output: Seq[AttributeReference] =
Seq(AttributeReference("result", StringType, nullable = false)())
}
+
+/**
+ * A logical plan representing insertion into Hive table
+ * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable
+ * because Hive Table doesn't have nullability for ARRAY, MAP,STRUCT types.
+ */
+case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
+ partition: Map[String, Option[String]],
+ child: LogicalPlan,
+ overwrite: OverwriteOptions,
+ ifNotExists: Boolean)
+ extends Command {
+
+ override def output: Seq[Attribute] = Seq.empty
+
+ // This is the expected schema of the table prepared to be inserted into
+ // including dynamic partition columns.
+ val tableOutput = table.carbonRelation.output
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 2743e7e..4169ac3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -43,7 +43,7 @@ case class CarbonDatasourceHadoopRelation(
parameters: Map[String, String],
tableSchema: Option[StructType],
isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]())
- extends BaseRelation with InsertableRelation {
+ extends BaseRelation {
lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
@@ -74,15 +74,4 @@ case class CarbonDatasourceHadoopRelation(
}
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
- override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- if (carbonRelation.output.size > CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
- sys.error("Maximum supported column by carbon is:" +
- CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
- }
- if(data.logicalPlan.output.size >= carbonRelation.output.size) {
- LoadTableByInsert(this, data.logicalPlan).run(sparkSession)
- } else {
- sys.error("Cannot insert into target table because column number are different")
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 2916a9f..55148d2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -54,6 +54,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
identifier.table.toLowerCase)) :: Nil
case ShowLoadsCommand(databaseName, table, limit) =>
ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output)) :: Nil
+ case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
+ _, child: LogicalPlan, _, _) =>
+ ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
ExecutedCommandExec(createDb) :: Nil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 4bd0564..77a0d90 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -280,7 +280,8 @@ object LoadTable {
}
-case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan) {
+case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
+ extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sparkSession: SparkSession): Seq[Row] = {
val df = Dataset.ofRows(sparkSession, child)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
new file mode 100644
index 0000000..45accac
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+
+/**
+ * Insert into carbon table from other source
+ */
+object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transform {
+ // Wait until children are resolved.
+ case p: LogicalPlan if !p.childrenResolved => p
+
+ case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+ if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
+ }
+ }
+
+ def castChildOutput(p: InsertIntoTable,
+ relation: CarbonDatasourceHadoopRelation,
+ child: LogicalPlan)
+ : LogicalPlan = {
+ if (relation.carbonRelation.output.size > CarbonCommonConstants
+ .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+ sys
+ .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+ .DEFAULT_MAX_NUMBER_OF_COLUMNS
+ )
+ }
+ if (child.output.size >= relation.carbonRelation.output.size) {
+ InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
+ } else {
+ sys.error("Cannot insert into target table because column number are different")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index d81fc09..38c7f34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,18 +16,22 @@
*/
package org.apache.spark.sql.hive
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
-import org.apache.spark.sql.execution.{CarbonLateDecodeStrategy, SparkOptimizer}
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
import org.apache.spark.sql.execution.command.DDLStrategy
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.SparkSession
/**
* Session state implementation to override sql parser and adding strategies
@@ -42,6 +46,26 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+ override lazy val analyzer: Analyzer = {
+ new Analyzer(catalog, conf) {
+ override val extendedResolutionRules =
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ CarbonPreInsertionCasts ::
+ AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableInsertion(conf) ::
+ DataSourceAnalysis(conf) ::
+ (if (conf.runSQLonFile) {
+ new ResolveDataSource(sparkSession) :: Nil
+ } else {
+ Nil
+ })
+
+ override val extendedCheckRules = Seq(
+ PreWriteCheck(conf, catalog))
+ }
+ }
}
class CarbonOptimizer(
[2/2] incubator-carbondata git commit: [CARBONDATA-845] Insert Select
Into Same Table This closes #723
Posted by ra...@apache.org.
[CARBONDATA-845] Insert Select Into Same Table This closes #723
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ada081d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ada081d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ada081d8
Branch: refs/heads/master
Commit: ada081d89be67ad28c31401acb71a84e3e97244a
Parents: 950a6d0 f527d3d
Author: ravipesala <ra...@gmail.com>
Authored: Thu Apr 6 10:18:50 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 6 10:18:50 2017 +0530
----------------------------------------------------------------------
.../InsertIntoCarbonTableTestCase.scala | 28 +++++++++
.../spark/sql/hive/CarbonAnalysisRules.scala | 43 +++++++-------
.../spark/sql/CarbonCatalystOperators.scala | 19 +++++++
.../sql/CarbonDatasourceHadoopRelation.scala | 13 +----
.../sql/execution/command/DDLStrategy.scala | 5 +-
.../execution/command/carbonTableSchema.scala | 3 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 60 ++++++++++++++++++++
.../spark/sql/hive/CarbonSessionState.scala | 30 +++++++++-
8 files changed, 163 insertions(+), 38 deletions(-)
----------------------------------------------------------------------