You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:21 UTC

[14/50] [abbrv] carbondata git commit: [CARBONDATA-2094] Filter DataMap Tables in Show Table Command

[CARBONDATA-2094] Filter DataMap Tables in Show Table Command

Currently Show Table command shows datamap tables (agg tablels) but show table command should not show aggregate tables.Solution :- Handle show table command in carbon side and Filter the datamap table and return rest of the tables.

This closes #1089


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ee1c4d42
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ee1c4d42
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ee1c4d42

Branch: refs/heads/branch-1.3
Commit: ee1c4d42fc0837e515ac222c676bd46fe93795d5
Parents: 19fdd4d
Author: BJangir <ba...@gmail.com>
Authored: Mon Jan 29 23:46:56 2018 +0530
Committer: kumarvishal <ku...@gmail.com>
Committed: Thu Feb 1 18:42:05 2018 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 36 +++++++++
 .../preaggregate/TestPreAggregateDrop.scala     |  9 ++-
 .../command/table/CarbonShowTablesCommand.scala | 82 ++++++++++++++++++++
 .../spark/sql/hive/CarbonSessionState.scala     | 11 ++-
 .../spark/sql/hive/CarbonSessionState.scala     | 11 ++-
 5 files changed, 142 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 23132de..f1d7396 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -233,6 +233,20 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   val timeSeries = TIMESERIES.toString
+  test("remove agg tables from show table command") {
+    sql("DROP TABLE IF EXISTS tbl_1")
+    sql("DROP TABLE IF EXISTS sparktable")
+    sql("create table if not exists  tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+    sql("create table if not exists sparktable(a int,b string)")
+    sql(
+      s"""create datamap preagg_sum on table tbl_1 using 'preaggregate' as select mac,avg(age) from tbl_1 group by mac"""
+        .stripMargin)
+    sql(
+      "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" +
+      ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," +
+      "mac from tbl_1 group by prodate,mac")
+    checkExistence(sql("show tables"), false, "tbl_1_preagg_sum","tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
+  }
 
   test("test pre agg  create table 21: create with preaggregate and hierarchy") {
     sql("DROP TABLE IF EXISTS maintabletime")
@@ -287,6 +301,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
+  test("remove  agg tables from show table command") {
+    sql("DROP TABLE IF EXISTS tbl_1")
+    sql("create table if not exists  tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+    sql("create datamap agg1 on table tbl_1 using 'preaggregate' as select mac, sum(age) from tbl_1 group by mac")
+    sql("create table if not exists  sparktable(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) ")
+    checkExistence(sql("show tables"), false, "tbl_1_agg1")
+    checkExistence(sql("show tables"), true, "sparktable","tbl_1")
+  }
+
+
+  test("remove TimeSeries agg tables from show table command") {
+    sql("DROP TABLE IF EXISTS tbl_1")
+    sql("create table if not exists  tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+    sql(
+      "create datamap agg2 on table tbl_1 using 'preaggregate' DMPROPERTIES ('timeseries" +
+      ".eventTime'='prodate', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select prodate," +
+      "mac from tbl_1 group by prodate,mac")
+    checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
+  }
+
+
+
   def getCarbontable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
     plan.transform {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 1138adf..911a725 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -46,8 +46,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
       " a,sum(c) from maintable group by a")
     sql("drop datamap if exists preagg2 on table maintable")
     val showTables = sql("show tables")
+    val showdatamaps =sql("show datamap on table maintable")
     checkExistence(showTables, false, "maintable_preagg2")
-    checkExistence(showTables, true, "maintable_preagg1")
+    checkExistence(showdatamaps, true, "maintable_preagg1")
   }
 
   test("drop datamap which is not existed") {
@@ -66,8 +67,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap preagg_same on table maintable")
     var showTables = sql("show tables")
+    val showdatamaps =sql("show datamap on table maintable1")
     checkExistence(showTables, false, "maintable_preagg_same")
-    checkExistence(showTables, true, "maintable1_preagg_same")
+    checkExistence(showdatamaps, true, "maintable1_preagg_same")
     sql("drop datamap preagg_same on table maintable1")
     showTables = sql("show tables")
     checkExistence(showTables, false, "maintable1_preagg_same")
@@ -84,7 +86,8 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
     sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" +
         " a,sum(c) from maintable group by a")
     showTables = sql("show tables")
-    checkExistence(showTables, true, "maintable_preagg_same1")
+    val showdatamaps =sql("show datamap on table maintable")
+    checkExistence(showdatamaps, true, "maintable_preagg_same1")
     sql("drop datamap preagg_same1 on table maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
new file mode 100644
index 0000000..c2a91d8
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.{BooleanType, StringType}
+
+
+private[sql] case class CarbonShowTablesCommand ( databaseName: Option[String],
+    tableIdentifierPattern: Option[String])  extends MetadataCommand{
+
+  // The result of SHOW TABLES has three columns: database, tableName and isTemporary.
+  override val output: Seq[Attribute] = {
+    AttributeReference("database", StringType, nullable = false)() ::
+    AttributeReference("tableName", StringType, nullable = false)() ::
+    AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
+  }
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    // Since we need to return a Seq of rows, we will call getTables directly
+    // instead of calling tables in sparkSession.
+    // filterDataMaps Method is to Filter the Table.
+    val catalog = sparkSession.sessionState.catalog
+    val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+    var tables =
+      tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
+    tables = filterDataMaps(tables, sparkSession)
+    tables.map { tableIdent =>
+      val isTemp = catalog.isTemporaryTable(tableIdent)
+      Row(tableIdent.database.getOrElse("default"), tableIdent.table, isTemp)
+    }
+  }
+
+  /**
+   *
+   * @param tables tableIdnetifers
+   * @param sparkSession sparksession
+   * @return  Tables after filter datamap tables
+   */
+  private def filterDataMaps(tables: Seq[TableIdentifier],
+      sparkSession: SparkSession): Seq[TableIdentifier] = {
+    // Filter carbon Tables then get CarbonTable and getDataMap List and filter the same
+    // as of now 2 times lookup is happening(filter  carbon table ,getDataMapList)
+    // TODO : add another PR (CARBONDATA-2103) to improve  with 1 lookup
+    val allDatamapTable = tables.filter { table =>
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .tableExists(table)(sparkSession)
+    }.map { table =>
+      val ctable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+      ctable.getTableInfo.getDataMapSchemaList.asScala
+    }
+    val alldamrelation = allDatamapTable
+      .flatMap { table =>
+        table.map(eachtable => eachtable.getRelationIdentifier.toString)
+      }
+    tables
+      .filter { table =>
+        !alldamrelation
+          .contains(table.database.getOrElse("default") + "." + table.identifier)
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 0fe0f96..0b62e10 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -22,11 +22,12 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionRes
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, _}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, ShowTablesContext}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
@@ -336,4 +337,10 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
       super.visitCreateTable(ctx)
     }
   }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) {
+    CarbonShowTablesCommand(
+      Option(ctx.db).map(_.getText),
+      Option(ctx.pattern).map(string))
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee1c4d42/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 3c151f0..baadd04 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -27,12 +27,13 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
 import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
@@ -395,4 +396,10 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
     super.visitCreateTable(ctx)
   }
+
+  override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) {
+    CarbonShowTablesCommand(
+      Option(ctx.db).map(_.getText),
+      Option(ctx.pattern).map(string))
+  }
 }