You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/02/01 13:12:41 UTC
carbondata git commit: [CARBONDATA-2094] Filter DataMap Tables in
Show Table Command
Repository: carbondata
Updated Branches:
refs/heads/master 19fdd4d75 -> ee1c4d42f
[CARBONDATA-2094] Filter DataMap Tables in Show Table Command
Currently Show Table command shows datamap tables (agg tablels) but show table command should not show aggregate tables.Solution :- Handle show table command in carbon side and Filter the datamap table and return rest of the tables.
This closes #1089
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ee1c4d42
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ee1c4d42
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ee1c4d42
Branch: refs/heads/master
Commit: ee1c4d42fc0837e515ac222c676bd46fe93795d5
Parents: 19fdd4d
Author: BJangir <ba...@gmail.com>
Authored: Mon Jan 29 23:46:56 2018 +0530
Committer: kumarvishal <ku...@gmail.com>
Committed: Thu Feb 1 18:42:05 2018 +0530
----------------------------------------------------------------------
.../preaggregate/TestPreAggCreateCommand.scala | 36 +++++++++
.../preaggregate/TestPreAggregateDrop.scala | 9 ++-
.../command/table/CarbonShowTablesCommand.scala | 82 ++++++++++++++++++++
.../spark/sql/hive/CarbonSessionState.scala | 11 ++-
.../spark/sql/hive/CarbonSessionState.scala | 11 ++-
5 files changed, 142 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 23132de..f1d7396 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -233,6 +233,20 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
}
val timeSeries = TIMESERIES.toString
+ test("remove agg tables from show table command") {
+ sql("DROP TABLE IF EXISTS tbl_1")
+ sql("DROP TABLE IF EXISTS sparktable")
+ sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+ sql("create table if not exists sparktable(a int,b string)")
+ sql(
+ s"""create datamap preagg_sum on table tbl_1 using 'preaggregate' as select mac,avg(age) from tbl_1 group by mac"""
+ .stripMargin)
+ sql(
+ "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" +
+ ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," +
+ "mac from tbl_1 group by prodate,mac")
+ checkExistence(sql("show tables"), false, "tbl_1_preagg_sum","tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
+ }
test("test pre agg create table 21: create with preaggregate and hierarchy") {
sql("DROP TABLE IF EXISTS maintabletime")
@@ -287,6 +301,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
}
+ test("remove agg tables from show table command") {
+ sql("DROP TABLE IF EXISTS tbl_1")
+ sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+ sql("create datamap agg1 on table tbl_1 using 'preaggregate' as select mac, sum(age) from tbl_1 group by mac")
+ sql("create table if not exists sparktable(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) ")
+ checkExistence(sql("show tables"), false, "tbl_1_agg1")
+ checkExistence(sql("show tables"), true, "sparktable","tbl_1")
+ }
+
+
+ test("remove TimeSeries agg tables from show table command") {
+ sql("DROP TABLE IF EXISTS tbl_1")
+ sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+ sql(
+ "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" +
+ ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," +
+ "mac from tbl_1 group by prodate,mac")
+ checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
+ }
+
+
+
def getCarbontable(plan: LogicalPlan) : CarbonTable ={
var carbonTable : CarbonTable = null
plan.transform {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 1138adf..911a725 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -46,8 +46,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
" a,sum(c) from maintable group by a")
sql("drop datamap if exists preagg2 on table maintable")
val showTables = sql("show tables")
+ val showdatamaps =sql("show datamap on table maintable")
checkExistence(showTables, false, "maintable_preagg2")
- checkExistence(showTables, true, "maintable_preagg1")
+ checkExistence(showdatamaps, true, "maintable_preagg1")
}
test("drop datamap which is not existed") {
@@ -66,8 +67,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
sql("drop datamap preagg_same on table maintable")
var showTables = sql("show tables")
+ val showdatamaps =sql("show datamap on table maintable1")
checkExistence(showTables, false, "maintable_preagg_same")
- checkExistence(showTables, true, "maintable1_preagg_same")
+ checkExistence(showdatamaps, true, "maintable1_preagg_same")
sql("drop datamap preagg_same on table maintable1")
showTables = sql("show tables")
checkExistence(showTables, false, "maintable1_preagg_same")
@@ -84,7 +86,8 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" +
" a,sum(c) from maintable group by a")
showTables = sql("show tables")
- checkExistence(showTables, true, "maintable_preagg_same1")
+ val showdatamaps =sql("show datamap on table maintable")
+ checkExistence(showdatamaps, true, "maintable_preagg_same1")
sql("drop datamap preagg_same1 on table maintable")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
new file mode 100644
index 0000000..c2a91d8
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.{BooleanType, StringType}
+
+
+private[sql] case class CarbonShowTablesCommand ( databaseName: Option[String],
+ tableIdentifierPattern: Option[String]) extends MetadataCommand{
+
+ // The result of SHOW TABLES has three columns: database, tableName and isTemporary.
+ override val output: Seq[Attribute] = {
+ AttributeReference("database", StringType, nullable = false)() ::
+ AttributeReference("tableName", StringType, nullable = false)() ::
+ AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
+ }
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ // Since we need to return a Seq of rows, we will call getTables directly
+ // instead of calling tables in sparkSession.
+ // filterDataMaps Method is to Filter the Table.
+ val catalog = sparkSession.sessionState.catalog
+ val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+ var tables =
+ tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
+ tables = filterDataMaps(tables, sparkSession)
+ tables.map { tableIdent =>
+ val isTemp = catalog.isTemporaryTable(tableIdent)
+ Row(tableIdent.database.getOrElse("default"), tableIdent.table, isTemp)
+ }
+ }
+
+ /**
+ *
+ * @param tables tableIdnetifers
+ * @param sparkSession sparksession
+ * @return Tables after filter datamap tables
+ */
+ private def filterDataMaps(tables: Seq[TableIdentifier],
+ sparkSession: SparkSession): Seq[TableIdentifier] = {
+ // Filter carbon Tables then get CarbonTable and getDataMap List and filter the same
+ // as of now 2 times lookup is happening(filter carbon table ,getDataMapList)
+ // TODO : add another PR (CARBONDATA-2103) to improve with 1 lookup
+ val allDatamapTable = tables.filter { table =>
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(table)(sparkSession)
+ }.map { table =>
+ val ctable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+ ctable.getTableInfo.getDataMapSchemaList.asScala
+ }
+ val alldamrelation = allDatamapTable
+ .flatMap { table =>
+ table.map(eachtable => eachtable.getRelationIdentifier.toString)
+ }
+ tables
+ .filter { table =>
+ !alldamrelation
+ .contains(table.database.getOrElse("default") + "." + table.identifier)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 0fe0f96..0b62e10 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -22,11 +22,12 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionRes
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
@@ -336,4 +337,10 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
super.visitCreateTable(ctx)
}
}
+
+ override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) {
+ CarbonShowTablesCommand(
+ Option(ctx.db).map(_.getText),
+ Option(ctx.pattern).map(string))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 3c151f0..baadd04 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,12 +27,13 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
@@ -395,4 +396,10 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
super.visitCreateTable(ctx)
}
+
+ override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) {
+ CarbonShowTablesCommand(
+ Option(ctx.db).map(_.getText),
+ Option(ctx.pattern).map(string))
+ }
}