You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/05 00:44:39 UTC
[30/50] [abbrv] carbondata git commit: show partition function
show partition function
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c3bfc4ad
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c3bfc4ad
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c3bfc4ad
Branch: refs/heads/streaming_ingest
Commit: c3bfc4ad87dfc66582b31b54ead2109a8e760bdb
Parents: c2b39b2
Author: mayun <si...@163.com>
Authored: Sun Jun 25 12:12:06 2017 +0800
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Jun 29 13:15:51 2017 +0530
----------------------------------------------------------------------
.../examples/CarbonPartitionExample.scala | 147 +++++++++++++
.../examples/CarbonPartitionExample.scala | 49 ++++-
.../partition/TestShowPartitions.scala | 216 +++++++++++++++++++
.../carbondata/spark/util/CommonUtil.scala | 47 ++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 +
.../spark/sql/CarbonCatalystOperators.scala | 9 +-
.../org/apache/spark/sql/CarbonSqlParser.scala | 10 +-
.../execution/command/carbonTableSchema.scala | 28 +++
.../spark/sql/hive/CarbonStrategies.scala | 16 ++
.../sql/execution/command/DDLStrategy.scala | 8 +
.../execution/command/carbonTableSchema.scala | 28 +++
11 files changed, 553 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
new file mode 100644
index 0000000..2f55189
--- /dev/null
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import scala.collection.mutable.LinkedHashMap
+
+import org.apache.spark.sql.AnalysisException
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.util.ExampleUtils
+
+object CarbonPartitionExample {
+
+ def main(args: Array[String]) {
+ val cc = ExampleUtils.createCarbonContext("CarbonPartitionExample")
+ val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ // none partition table
+ cc.sql("DROP TABLE IF EXISTS t0")
+ cc.sql("""
+ | CREATE TABLE IF NOT EXISTS t0
+ | (
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Int,
+ | country String,
+ | area String
+ | )
+ | STORED BY 'carbondata'
+ """.stripMargin)
+
+ // range partition
+ cc.sql("DROP TABLE IF EXISTS t1")
+ cc.sql("""
+ | CREATE TABLE IF NOT EXISTS t1(
+ | vin STRING,
+ | phonenumber INT,
+ | country STRING,
+ | area STRING
+ | )
+ | PARTITIONED BY (logdate TIMESTAMP)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='2014/01/01,2015/01/01,2016/01/01')
+ """.stripMargin)
+
+ // hash partition
+ cc.sql("""
+ | CREATE TABLE IF NOT EXISTS t3(
+ | logdate Timestamp,
+ | phonenumber Int,
+ | country String,
+ | area String
+ | )
+ | PARTITIONED BY (vin String)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5')
+ """.stripMargin)
+
+ // list partition
+ cc.sql("DROP TABLE IF EXISTS t5")
+ cc.sql("""
+ | CREATE TABLE IF NOT EXISTS t5(
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Int,
+ | area String
+ | )
+ | PARTITIONED BY (country string)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ')
+ """.stripMargin)
+
+ cc.sql(s"DROP TABLE IF EXISTS partitionDB.t9")
+ cc.sql(s"DROP DATABASE IF EXISTS partitionDB")
+ cc.sql(s"CREATE DATABASE partitionDB")
+ cc.sql(s"""
+ | CREATE TABLE IF NOT EXISTS partitionDB.t9(
+ | logdate Timestamp,
+ | phonenumber Int,
+ | country String,
+ | area String
+ | )
+ | PARTITIONED BY (vin String)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5')
+ """.stripMargin)
+ // hive partition table
+ cc.sql("DROP TABLE IF EXISTS t7")
+ cc.sql("""
+ | create table t7(id int, name string) partitioned by (city string)
+ | row format delimited fields terminated by ','
+ """.stripMargin)
+ cc.sql("alter table t7 add partition (city = 'Hangzhou')")
+ // hive partition table
+ cc.sql(s"DROP TABLE IF EXISTS hiveDB.t7")
+ cc.sql(s"CREATE DATABASE IF NOT EXISTS hiveDB")
+ cc.sql("""
+ | create table hiveDB.t7(id int, name string) partitioned by (city string)
+ | row format delimited fields terminated by ','
+ """.stripMargin)
+ cc.sql("alter table hiveDB.t7 add partition (city = 'Shanghai')")
+ // show partitions
+ try {
+ cc.sql("SHOW PARTITIONS t0").show()
+ } catch {
+ case ex: AnalysisException => print(ex.getMessage())
+ }
+ cc.sql("SHOW PARTITIONS t1").show()
+ cc.sql("SHOW PARTITIONS t3").show()
+ cc.sql("SHOW PARTITIONS t5").show()
+ cc.sql("SHOW PARTITIONS t7").show()
+ cc.sql("use hiveDB").show()
+ cc.sql("SHOW PARTITIONS t7").show()
+ cc.sql("use default").show()
+ cc.sql("SHOW PARTITIONS partitionDB.t9").show()
+
+ cc.sql("DROP TABLE IF EXISTS t0")
+ cc.sql("DROP TABLE IF EXISTS t1")
+ cc.sql("DROP TABLE IF EXISTS t3")
+ cc.sql("DROP TABLE IF EXISTS t5")
+ cc.sql("DROP TABLE IF EXISTS t7")
+ cc.sql(s"DROP TABLE IF EXISTS hiveDb.t7")
+ cc.sql(s"DROP TABLE IF EXISTS partitionDB.t9")
+ cc.sql(s"DROP DATABASE IF EXISTS partitionDB")
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 8a0479f..4cdde42 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.examples
import java.io.File
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -50,7 +52,6 @@ object CarbonPartitionExample {
// none partition table
spark.sql("DROP TABLE IF EXISTS t0")
-
spark.sql("""
| CREATE TABLE IF NOT EXISTS t0
| (
@@ -65,7 +66,6 @@ object CarbonPartitionExample {
// range partition
spark.sql("DROP TABLE IF EXISTS t1")
-
spark.sql("""
| CREATE TABLE IF NOT EXISTS t1
| (
@@ -82,7 +82,6 @@ object CarbonPartitionExample {
// hash partition
spark.sql("DROP TABLE IF EXISTS t3")
-
spark.sql("""
| CREATE TABLE IF NOT EXISTS t3
| (
@@ -98,7 +97,6 @@ object CarbonPartitionExample {
// list partition
spark.sql("DROP TABLE IF EXISTS t5")
-
spark.sql("""
| CREATE TABLE IF NOT EXISTS t5
| (
@@ -113,14 +111,57 @@ object CarbonPartitionExample {
| 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ')
""".stripMargin)
+ // hive partition table
+ spark.sql("DROP TABLE IF EXISTS t7")
+ spark.sql("""
+ | create table t7(id int, name string) partitioned by (city string)
+ | row format delimited fields terminated by ','
+ """.stripMargin)
+ spark.sql("alter table t7 add partition (city = 'Hangzhou')")
+
+ // not default db partition table
+ try {
+ spark.sql(s"DROP TABLE IF EXISTS partitionDB.t9")
+ } catch {
+ case ex: NoSuchDatabaseException => print(ex.getMessage())
+ }
+ spark.sql(s"DROP DATABASE IF EXISTS partitionDB")
+ spark.sql(s"CREATE DATABASE partitionDB")
+ spark.sql(s"""
+ | CREATE TABLE IF NOT EXISTS partitionDB.t9(
+ | logdate Timestamp,
+ | phonenumber Int,
+ | country String,
+ | area String
+ | )
+ | PARTITIONED BY (vin String)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5')
+ """.stripMargin)
+
// show tables
spark.sql("SHOW TABLES").show()
+ // show partitions
+ try {
+ spark.sql("""SHOW PARTITIONS t0""").show()
+ } catch {
+ case ex: AnalysisException => print(ex.getMessage())
+ }
+ spark.sql("""SHOW PARTITIONS t1""").show()
+ spark.sql("""SHOW PARTITIONS t3""").show()
+ spark.sql("""SHOW PARTITIONS t5""").show()
+ spark.sql("""SHOW PARTITIONS t7""").show()
+ spark.sql("""SHOW PARTITIONS partitionDB.t9""").show()
+
// drop table
spark.sql("DROP TABLE IF EXISTS t0")
spark.sql("DROP TABLE IF EXISTS t1")
spark.sql("DROP TABLE IF EXISTS t3")
spark.sql("DROP TABLE IF EXISTS t5")
+ spark.sql("DROP TABLE IF EXISTS t7")
+ spark.sql("DROP TABLE IF EXISTS partitionDB.t9")
+ spark.sql(s"DROP DATABASE IF EXISTS partitionDB")
spark.close()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
new file mode 100644
index 0000000..7b53964
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.partition
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestShowPartition extends QueryTest with BeforeAndAfterAll {
+ override def beforeAll = {
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+
+ sql("drop table if exists notPartitionTable")
+ sql("""
+ | CREATE TABLE notPartitionTable
+ | (
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Int,
+ | country String,
+ | area String
+ | )
+ | STORED BY 'carbondata'
+ """.stripMargin)
+
+ sql("drop table if exists hashTable")
+ sql(
+ """
+ | CREATE TABLE hashTable (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+
+ sql("drop table if exists rangeTable")
+ sql(
+ """
+ | CREATE TABLE rangeTable (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (doj Timestamp)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='01-01-2010, 01-01-2015')
+ """.stripMargin)
+
+ sql("drop table if exists listTable")
+ sql(
+ """
+ | CREATE TABLE listTable (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='0, 1, (2, 3)')
+ """.stripMargin)
+
+ sql(s"CREATE DATABASE if not exists partitionDB")
+ sql("drop table if exists partitionDB.hashTable")
+ sql("drop table if exists partitionDB.rangeTable")
+ sql("drop table if exists partitionDB.listTable")
+ sql(
+ """
+ | CREATE TABLE partitionDB.hashTable (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql(
+ """
+ | CREATE TABLE partitionDB.rangeTable (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (doj Timestamp)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='01-01-2010, 01-01-2015')
+ """.stripMargin)
+ sql(
+ """
+ | CREATE TABLE partitionDB.listTable (empno int, empname String, designation String,
+ | doj Timestamp,workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='0, 1, (2, 3)')
+ """.stripMargin)
+
+ sql("DROP TABLE IF EXISTS hiveTable")
+ sql("""
+ | create table hiveTable(id int, name string) partitioned by (city string)
+ | row format delimited fields terminated by ','
+ """.stripMargin)
+ sql("alter table hiveTable add partition (city = 'Hangzhou')")
+
+ sql(s"CREATE DATABASE if not exists hiveDB")
+ sql("DROP TABLE IF EXISTS hiveDB.hiveTable")
+ sql("""
+ | create table hiveDB.hiveTable(id int, name string) partitioned by (city string)
+ | row format delimited fields terminated by ','
+ """.stripMargin)
+ sql("alter table hiveDB.hiveTable add partition (city = 'Shanghai')")
+ }
+
+ test("show partition table: exception when show not partition table") {
+ val errorMessage =
+ intercept[AnalysisException] { sql("show partitions notPartitionTable").show() }
+ assert(errorMessage.getMessage.contains(
+ "SHOW PARTITIONS is not allowed on a table that is not partitioned: notpartitiontable"))
+ }
+
+ test("show partition table: hash table") {
+ // EqualTo
+ checkAnswer(sql("show partitions hashTable"), Seq(Row("empno=HASH_NUMBER(3)")))
+
+ }
+
+ test("show partition table: range partition") {
+ // EqualTo
+ checkAnswer(sql("show partitions rangeTable"), Seq(Row("doj=default"),
+ Row("doj<01-01-2010"), Row("01-01-2010<=doj<01-01-2015")))
+ }
+
+ test("show partition table: list partition") {
+ // EqualTo
+ checkAnswer(sql("show partitions listTable"), Seq(Row("workgroupcategory=default"),
+ Row("workgroupcategory=0"), Row("workgroupcategory=1"), Row("workgroupcategory=2, 3")))
+
+ }
+ test("show partition table: not default db") {
+ // EqualTo
+ checkAnswer(sql("show partitions partitionDB.hashTable"), Seq(Row("empno=HASH_NUMBER(3)")))
+ // EqualTo
+ checkAnswer(sql("show partitions partitionDB.rangeTable"), Seq(Row("doj=default"),
+ Row("doj<01-01-2010"), Row("01-01-2010<=doj<01-01-2015")))
+ // EqualTo
+ checkAnswer(sql("show partitions partitionDB.listTable"), Seq(Row("workgroupcategory=default"),
+ Row("workgroupcategory=0"), Row("workgroupcategory=1"), Row("workgroupcategory=2, 3")))
+
+ }
+
+ test("show partition table: hive partition table") {
+ // EqualTo
+ checkAnswer(sql("show partitions hiveTable"), Seq(Row("city=Hangzhou")))
+ sql("use hiveDB").show()
+ checkAnswer(sql("show partitions hiveTable"), Seq(Row("city=Shanghai")))
+ sql("use default").show()
+ }
+
+ override def afterAll = {
+ sql("drop table if exists notPartitionTable")
+ sql("drop table if exists hashTable")
+ sql("drop table if exists listTable")
+ sql("drop table if exists rangeTable")
+ sql("drop table if exists hiveTable")
+ try {
+ sql("drop table if exists partitionDB.hashTable")
+
+ } catch {
+ case ex: NoSuchDatabaseException => print(ex.getMessage())
+ }
+ try {
+ sql("drop table if exists partitionDB.rangeTable")
+ } catch {
+ case ex: NoSuchDatabaseException => print(ex.getMessage())
+ }
+ try {
+ sql("drop table if exists partitionDB.listTable")
+ } catch {
+ case ex: NoSuchDatabaseException => print(ex.getMessage())
+ }
+ try {
+ sql("drop table if exists hiveDB.hiveTable")
+ } catch {
+ case ex: NoSuchDatabaseException => print(ex.getMessage())
+ }
+ sql("DROP DATABASE if exists partitionDB")
+ sql("DROP DATABASE if exists hiveDB")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d3b6f8d..ac2e311 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -27,12 +27,20 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.RowFactory
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.types.StringType
import org.apache.spark.util.FileUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.csvload.CSVInputFormat
@@ -544,4 +552,43 @@ object CommonUtil {
}
}
+ def getPartitionInfo(columnName: String, partitionType: PartitionType,
+ partitionInfo: PartitionInfo): Seq[Row] = {
+ var result = Seq.newBuilder[Row]
+ partitionType match {
+ case PartitionType.RANGE =>
+ result.+=(RowFactory.create(columnName + "=default"))
+ var rangeInfo = partitionInfo.getRangeInfo
+ var size = rangeInfo.size() - 1
+ for (index <- 0 to size) {
+ if (index == 0) {
+ result.+=(RowFactory.create(columnName + "<" + rangeInfo.get(index)))
+ } else {
+ result.+=(RowFactory.create(rangeInfo.get(index - 1) + "<=" +
+ columnName + "<" + rangeInfo.get(index)))
+ }
+ }
+ case PartitionType.RANGE_INTERVAL =>
+ result.+=(RowFactory.create(columnName + "="))
+ case PartitionType.LIST =>
+ result.+=(RowFactory.create(columnName + "=default"))
+ var listInfo = partitionInfo.getListInfo
+ listInfo.asScala.foreach {
+ f =>
+ result.+=(RowFactory.create(columnName + "=" +
+ f.toArray().mkString(", ")))
+ }
+ case PartitionType.HASH =>
+ var hashNumber = partitionInfo.getNumPartitions
+ result.+=(RowFactory.create(columnName + "=HASH_NUMBER(" + hashNumber.toString() + ")"))
+ case others =>
+ result.+=(RowFactory.create(columnName + "="))
+ }
+ result.result()
+ }
+
+ def partitionInfoOutput: Seq[Attribute] = Seq(
+ AttributeReference("partition", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "partitions info").build())()
+ )
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 383d308..c565c31 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -112,6 +112,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
protected val PARTITIONER = carbonKeyWord("PARTITIONER")
+ protected val PARTITIONS = carbonKeyWord("PARTITIONS")
protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
protected val RELATION = carbonKeyWord("RELATION")
protected val SCHEMA = carbonKeyWord("SCHEMA")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index c1a0dc2..024c54b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
+import org.apache.spark.sql.catalyst.plans.logical.{ UnaryNode, _ }
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types._
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.util.CommonUtil
/**
* Top command
@@ -137,6 +138,12 @@ case class DeleteRecords(
override def output: Seq[AttributeReference] = Seq.empty
}
+case class ShowPartitions(
+ table: TableIdentifier) extends LogicalPlan {
+ override def children: Seq[LogicalPlan] = Seq.empty
+ override def output: Seq[Attribute] = CommonUtil.partitionInfoOutput
+}
+
/**
* A logical plan representing insertion into Hive table.
* This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index f12e54b..a664104 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -61,7 +61,8 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
protected lazy val startCommand: Parser[LogicalPlan] =
createDatabase | dropDatabase | loadManagement | describeTable |
- showLoads | alterTable | updateTable | deleteRecords | useDatabase | createTable
+ showPartitions | showLoads | alterTable | updateTable | deleteRecords | useDatabase |
+ createTable
protected lazy val loadManagement: Parser[LogicalPlan] =
deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -487,6 +488,13 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
}
UpdateTable(relation, columns, selectStmt, where)
}
+ protected lazy val showPartitions: Parser[LogicalPlan] =
+ (SHOW ~> PARTITIONS ~> table) <~ opt(";") ^^ {
+ case table =>
+ val tableName = getTableName(table.tableIdentifier)
+ val alias = table.alias.getOrElse("")
+ ShowPartitions(table.tableIdentifier)
+ }
private def splitQuery(query: String): (String, String) = {
val stack = scala.collection.mutable.Stack[Char]()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ba22c3c..3477abb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -73,6 +73,34 @@ object Checker {
}
/**
+ * Command for show table partitions Command
+ *
+ * @param tableIdentifier
+ */
+private[sql] case class ShowCarbonPartitionsCommand(
+ tableIdentifier: TableIdentifier) extends RunnableCommand {
+ val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
+ override val output = CommonUtil.partitionInfoOutput
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val relation = CarbonEnv.get.carbonMetastore
+ .lookupRelation1(tableIdentifier)(sqlContext).
+ asInstanceOf[CarbonRelation]
+ val carbonTable = relation.tableMeta.carbonTable
+ var tableName = carbonTable.getFactTableName
+ var partitionInfo = carbonTable.getPartitionInfo(
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ if (partitionInfo == null) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+ }
+ var partitionType = partitionInfo.getPartitionType
+ var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+ LOGGER.info("partition column name:" + columnName)
+ CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
+ }
+}
+
+/**
* Command for the compaction in alter table command
*
* @param alterTableModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index f0cd33b..aba39f7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -316,6 +316,22 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
} else {
ExecutedCommand(HiveNativeCommand(sql)) :: Nil
}
+ case ShowPartitions(t) =>
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
+ .tableExists(t)(sqlContext)
+ if (isCarbonTable) {
+ ExecutedCommand(ShowCarbonPartitionsCommand(t)) :: Nil
+ } else {
+ var tableName = t.table
+ var database = t.database
+ var sql: String = null
+ if (database.isEmpty) {
+ sql = s"show partitions $tableName"
+ } else {
+ sql = s"show partitions $database.$tableName"
+ }
+ ExecutedCommand(HiveNativeCommand(sql)) :: Nil
+ }
case _ =>
Nil
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 7d0215f..6087736 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -115,6 +115,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+ case ShowPartitionsCommand(t, cols) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(t)(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(ShowCarbonPartitionsCommand(t)) :: Nil
+ } else {
+ ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil
+ }
case set@SetCommand(kv) =>
ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
case reset@ResetCommand =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3bfc4ad/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f9f556d..8fe4bd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -73,6 +73,34 @@ object Checker {
}
/**
+ * Command for show table partitions Command
+ *
+ * @param tableIdentifier
+ */
+private[sql] case class ShowCarbonPartitionsCommand(
+ tableIdentifier: TableIdentifier) extends RunnableCommand {
+ val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
+ override val output = CommonUtil.partitionInfoOutput
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).
+ asInstanceOf[CarbonRelation]
+ val carbonTable = relation.tableMeta.carbonTable
+ var tableName = carbonTable.getFactTableName
+ var partitionInfo = carbonTable.getPartitionInfo(
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ if (partitionInfo == null) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+ }
+ var partitionType = partitionInfo.getPartitionType
+ var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+ LOGGER.info("partition column name:" + columnName)
+ CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
+ }
+}
+
+/**
* Command for the compaction in alter table command
*
* @param alterTableModel