You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/29 13:11:51 UTC

carbondata git commit: [CARBONDATA-1817] Reject creating datamap on streaming table

Repository: carbondata
Updated Branches:
  refs/heads/master 90fb6baf2 -> 684447383


[CARBONDATA-1817] Reject creating datamap on streaming table

This closes #1576


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/68444738
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/68444738
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/68444738

Branch: refs/heads/master
Commit: 684447383c86e100ce2f1710ec9351fdd3a83148
Parents: 90fb6ba
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 29 19:13:00 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 29 21:09:20 2017 +0800

----------------------------------------------------------------------
 .../command/datamap/CarbonCreateDataMapCommand.scala        | 8 ++++++++
 .../management/CarbonAlterTableCompactionCommand.scala      | 2 +-
 .../spark/carbondata/TestStreamingTableOperation.scala      | 9 +++++++++
 3 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/68444738/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 29c60ea..622bf0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateT
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Below command class will be used to create datamap on table
@@ -39,6 +40,13 @@ case class CarbonCreateDataMapCommand(
   extends AtomicRunnableCommand {
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    // since streaming segment does not support building index and pre-aggregate yet,
+    // so streaming table does not support create datamap
+    val carbonTable =
+      CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+    if (carbonTable.isStreamingTable) {
+      throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
+    }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68444738/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index cbc1412..462b055 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, DataCommand, DataProcessOperation, RunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, DataCommand}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.util.CarbonException
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68444738/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index a5268fb..7a30c2e 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -707,6 +707,15 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+  test("do not support creating datamap on streaming table") {
+    assert(
+      intercept[MalformedCarbonCommandException](
+        sql("CREATE DATAMAP datamap ON TABLE source " +
+            "USING 'preaggregate'" +
+            " AS SELECT c1, sum(c2) FROM source GROUP BY c1")
+      ).getMessage.contains("Streaming table does not support creating datamap"))
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,