You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/21 04:14:58 UTC

[1/2] carbondata git commit: [CARBONDATA-2504][STREAM] Support StreamSQL for streaming job

Repository: carbondata
Updated Branches:
  refs/heads/master 6eb360e1f -> 2ea3b2dc5


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
index 9e338e7..daa1447 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -65,7 +65,7 @@ class LocalCarbonStore extends MetaCachedCarbonStore {
     Objects.requireNonNull(projectColumns);
 
     CarbonTable table = getTable(path);
-    if (table.isStreamingTable() || table.isHivePartitionTable()) {
+    if (table.isStreamingSink() || table.isHivePartitionTable()) {
       throw new UnsupportedOperationException("streaming and partition table is not supported");
     }
     // TODO: use InputFormat to prune data and read data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 1696fdc..5a888ef 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.streaming.parser
 
-import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 
 import org.apache.hadoop.conf.Configuration


[2/2] carbondata git commit: [CARBONDATA-2504][STREAM] Support StreamSQL for streaming job

Posted by ra...@apache.org.
[CARBONDATA-2504][STREAM] Support StreamSQL for streaming job

Currently, user need to write Spark Streaming APP to use carbon streaming ingest feature, which is not so easy for some users. By providing StreamSQL, user can manage the streaming job more easily.

This closes #2328


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ea3b2dc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ea3b2dc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ea3b2dc

Branch: refs/heads/master
Commit: 2ea3b2dc5841c604c5fb44a6e7f18c7d2db8c543
Parents: 6eb360e
Author: Jacky Li <ja...@qq.com>
Authored: Mon May 21 21:49:33 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 21 09:44:22 2018 +0530

----------------------------------------------------------------------
 .../exceptions/NoSuchStreamException.java       |  35 ++
 .../core/metadata/schema/table/CarbonTable.java |  15 +-
 .../spark/util/SparkDataTypeConverterImpl.java  |  45 +++
 .../apache/carbondata/spark/CarbonOption.scala  |   9 +-
 .../carbondata/spark/StreamingOption.scala      |  67 ++++
 .../carbondata/spark/util/CarbonScalaUtil.scala |   5 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +
 .../spark/util/CarbonMetastoreTypes.scala       | 104 ++++++
 .../apache/spark/util/SparkTypeConverter.scala  | 134 +++++++
 .../carbondata/stream/StreamJobManager.scala    | 198 ++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     |   2 +-
 .../spark/sql/CarbonSparkStreamingFactory.scala |   2 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   2 +-
 .../stream/CarbonCreateStreamCommand.scala      | 130 +++++++
 .../stream/CarbonDropStreamCommand.scala        |  36 ++
 .../stream/CarbonShowStreamsCommand.scala       |  76 ++++
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../sql/execution/strategy/DDLStrategy.scala    |   2 +-
 .../strategy/StreamingTableStrategy.scala       |   2 +-
 .../sql/hive/CarbonPreAggregateRules.scala      |   6 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  89 +----
 .../sql/parser/CarbonSpark2SqlParser.scala      |  44 ++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   4 +-
 .../apache/spark/util/SparkTypeConverter.scala  | 135 -------
 .../TestStreamingTableOperation.scala           | 359 ++++++++++++++++++-
 .../carbondata/store/LocalCarbonStore.java      |   2 +-
 .../streaming/parser/RowStreamParserImp.scala   |   1 -
 28 files changed, 1254 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/common/src/main/java/org/apache/carbondata/common/exceptions/NoSuchStreamException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/NoSuchStreamException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/NoSuchStreamException.java
new file mode 100644
index 0000000..77fa7fb
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/NoSuchStreamException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class NoSuchStreamException extends Exception {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 133241232L;
+
+  public NoSuchStreamException(String streamName) {
+    super("stream '" + streamName + "' not found");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index b7bef28..c302b2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -874,11 +874,20 @@ public class CarbonTable implements Serializable {
   }
 
   /**
-   * Return true if this is a streaming table (table with property "streaming"="true")
+   * Return true if this is a streaming table (table with property "streaming"="true" or "sink")
    */
-  public boolean isStreamingTable() {
+  public boolean isStreamingSink() {
     String streaming = getTableInfo().getFactTable().getTableProperties().get("streaming");
-    return streaming != null && streaming.equalsIgnoreCase("true");
+    return streaming != null &&
+        (streaming.equalsIgnoreCase("true") || streaming.equalsIgnoreCase("sink"));
+  }
+
+  /**
+   * Return true if this is a streaming source (table with property "streaming"="source")
+   */
+  public boolean isStreamingSource() {
+    String streaming = getTableInfo().getFactTable().getTableProperties().get("streaming");
+    return streaming != null && streaming.equalsIgnoreCase("source");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
index f6dc65b..3951642 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
@@ -19,21 +19,29 @@ package org.apache.carbondata.spark.util;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.util.GenericArrayData;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.CarbonMetastoreTypes;
+import org.apache.spark.util.SparkTypeConverter;
 
 /**
  * Convert java data type to spark data type
@@ -171,4 +179,41 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
     }
     return fields;
   }
+
+  public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) {
+    List<StructField> fields = new ArrayList<>(carbonColumns.length);
+    for (int i = 0; i < carbonColumns.length; i++) {
+      ColumnSchema carbonColumn = carbonColumns[i];
+      DataType dataType = carbonColumn.getDataType();
+      if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+        fields.add(new StructField(carbonColumn.getColumnName(),
+            new DecimalType(carbonColumn.getPrecision(), carbonColumn.getScale()),
+            true, Metadata.empty()));
+      } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isStructType(dataType)) {
+        fields.add(
+            new StructField(
+                carbonColumn.getColumnName(),
+                CarbonMetastoreTypes.toDataType(
+                    String.format("struct<%s>",
+                        SparkTypeConverter.getStructChildren(table, carbonColumn.getColumnName()))),
+                true,
+                Metadata.empty()));
+      } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isArrayType(dataType)) {
+        fields.add(
+            new StructField(
+                carbonColumn.getColumnName(),
+                CarbonMetastoreTypes.toDataType(
+                    String.format("array<%s>",
+                        SparkTypeConverter.getArrayChildren(
+                            table,
+                            carbonColumn.getColumnName()))),
+                true,
+                Metadata.empty()));
+      } else {
+        fields.add(new StructField(carbonColumn.getColumnName(),
+            convertCarbonToSparkDataType(carbonColumn.getDataType()), true, Metadata.empty()));
+      }
+    }
+    return new StructType(fields.toArray(new StructField[0]));
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index e854bbe..a48e63d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -57,8 +57,13 @@ class CarbonOption(options: Map[String, String]) {
   def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
                                     options.contains("bucketnumber")
 
-  def isStreaming: Boolean =
-    options.getOrElse("streaming", "false").toBoolean
+  def isStreaming: Boolean = {
+    var stream = options.getOrElse("streaming", "false")
+    if (stream.equalsIgnoreCase("sink") || stream.equalsIgnoreCase("source")) {
+      stream = "true"
+    }
+    stream.toBoolean
+  }
 
   def overwriteEnabled: Boolean =
     options.getOrElse("overwrite", "false").toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
new file mode 100644
index 0000000..c724474
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+class StreamingOption(val userInputMap: Map[String, String]) {
+  lazy val trigger: Trigger = {
+    val trigger = userInputMap.getOrElse(
+      "trigger", throw new MalformedCarbonCommandException("trigger must be specified"))
+    val interval = userInputMap.getOrElse(
+      "interval", throw new MalformedCarbonCommandException("interval must be specified"))
+    trigger match {
+      case "ProcessingTime" => ProcessingTime(interval)
+      case others => throw new MalformedCarbonCommandException("invalid trigger: " + trigger)
+    }
+  }
+
+  def checkpointLocation(tablePath: String): String =
+    userInputMap.getOrElse(
+      "checkpointLocation",
+      CarbonTablePath.getStreamingCheckpointDir(tablePath))
+
+  lazy val timeStampFormat: String =
+    userInputMap.getOrElse("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+  lazy val dateFormat: String =
+    userInputMap.getOrElse("dateformat", CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+
+  lazy val rowParser: String =
+    userInputMap.getOrElse(CarbonStreamParser.CARBON_STREAM_PARSER,
+      CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+
+  lazy val remainingOption: Map[String, String] = {
+    // copy the user input map and remove the fix options
+    val mutableMap = mutable.Map[String, String]() ++= userInputMap
+    mutableMap.remove("checkpointLocation")
+    mutableMap.remove("timestampformat")
+    mutableMap.remove("dateformat")
+    mutableMap.remove("trigger")
+    mutableMap.remove("interval")
+    mutableMap.remove(CarbonStreamParser.CARBON_STREAM_PARSER)
+    mutableMap.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 6227655..3e94a66 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumn
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
 import org.apache.carbondata.core.metadata.ColumnIdentifier
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
@@ -104,7 +104,8 @@ object CarbonScalaUtil {
 
   def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
     if (CarbonDataTypes.isDecimal(dataType)) {
-      DecimalType.SYSTEM_DEFAULT
+      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+        dataType.asInstanceOf[CarbonDecimalType].getScale)
     } else {
       dataType match {
         case CarbonDataTypes.STRING => StringType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 0d53a73..350fc36 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -184,6 +184,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val SELECT = carbonKeyWord("SELECT")
   protected val REBUILD = carbonKeyWord("REBUILD")
   protected val DEFERRED = carbonKeyWord("DEFERRED")
+  protected val STREAM = carbonKeyWord("STREAM")
+  protected val STREAMS = carbonKeyWord("STREAMS")
+  protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonMetastoreTypes.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonMetastoreTypes.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonMetastoreTypes.scala
new file mode 100644
index 0000000..6dbd3a3
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonMetastoreTypes.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CarbonException
+
+object CarbonMetastoreTypes extends RegexParsers {
+  protected lazy val primitiveType: Parser[DataType] =
+    "string" ^^^ StringType |
+    "varchar" ^^^ StringType |
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "date" ^^^ DateType |
+    "timestamp" ^^^ TimestampType
+
+  protected lazy val fixedDecimalType: Parser[DataType] =
+    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+      case precision ~ scale =>
+        DecimalType(precision.toInt, scale.toInt)
+    }
+
+  protected lazy val arrayType: Parser[DataType] =
+    "array" ~> "<" ~> dataType <~ ">" ^^ {
+      case tpe => ArrayType(tpe)
+    }
+
+  protected lazy val mapType: Parser[DataType] =
+    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+      case t1 ~ _ ~ t2 => MapType(t1, t2)
+    }
+
+  protected lazy val structField: Parser[StructField] =
+    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+    }
+
+  protected lazy val structType: Parser[DataType] =
+    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+      case fields => StructType(fields)
+    }
+
+  protected lazy val dataType: Parser[DataType] =
+    arrayType |
+    mapType |
+    structType |
+    primitiveType
+
+  def toDataType(metastoreType: String): DataType = {
+    parseAll(dataType, metastoreType) match {
+      case Success(result, _) => result
+      case _: NoSuccess =>
+        CarbonException.analysisException(s"Unsupported dataType: $metastoreType")
+    }
+  }
+
+  def toMetastoreType(dt: DataType): String = {
+    dt match {
+      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+      case StructType(fields) =>
+        s"struct<${
+          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+            .mkString(",")
+        }>"
+      case StringType => "string"
+      case FloatType => "float"
+      case IntegerType => "int"
+      case ShortType => "tinyint"
+      case DoubleType => "double"
+      case LongType => "bigint"
+      case BinaryType => "binary"
+      case BooleanType => "boolean"
+      case DecimalType() => "decimal"
+      case TimestampType => "timestamp"
+      case DateType => "date"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
new file mode 100644
index 0000000..8a9277c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Objects
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.types
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
+
+private[spark] object SparkTypeConverter {
+
+  def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = {
+    Objects.requireNonNull(table)
+    Objects.requireNonNull(columns)
+    if (columns.isEmpty) {
+      throw new IllegalArgumentException("column list is empty")
+    }
+    val fields = new java.util.ArrayList[StructField](columns.size)
+    val allColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
+
+    // find the column and add it to fields array
+    columns.foreach { column =>
+      val col = allColumns.find(_.getColumnName.equalsIgnoreCase(column)).getOrElse(
+        throw new IllegalArgumentException(column + " does not exist")
+      )
+      fields.add(StructField(col.getColumnName, convertCarbonToSparkDataType(col, table)))
+    }
+    StructType(fields)
+  }
+
+  /**
+   * Converts from carbon datatype to corresponding spark datatype.
+   */
+  def convertCarbonToSparkDataType(
+      columnSchema: ColumnSchema,
+      table: CarbonTable): types.DataType = {
+    if (CarbonDataTypes.isDecimal(columnSchema.getDataType)) {
+      val scale = columnSchema.getScale
+      val precision = columnSchema.getPrecision
+      if (scale == 0 && precision == 0) {
+        DecimalType(18, 2)
+      } else {
+        DecimalType(precision, scale)
+      }
+    } else if (CarbonDataTypes.isArrayType(columnSchema.getDataType)) {
+      CarbonMetastoreTypes
+        .toDataType(s"array<${ getArrayChildren(table, columnSchema.getColumnName) }>")
+    } else if (CarbonDataTypes.isStructType(columnSchema.getDataType)) {
+      CarbonMetastoreTypes
+        .toDataType(s"struct<${ getStructChildren(table, columnSchema.getColumnName) }>")
+    } else {
+      columnSchema.getDataType match {
+        case CarbonDataTypes.STRING => StringType
+        case CarbonDataTypes.SHORT => ShortType
+        case CarbonDataTypes.INT => IntegerType
+        case CarbonDataTypes.LONG => LongType
+        case CarbonDataTypes.DOUBLE => DoubleType
+        case CarbonDataTypes.BOOLEAN => BooleanType
+        case CarbonDataTypes.TIMESTAMP => TimestampType
+        case CarbonDataTypes.DATE => DateType
+      }
+    }
+  }
+
+  def getArrayChildren(table: CarbonTable, dimName: String): String = {
+    table.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.getName.toLowerCase match {
+        case "array" => s"array<${ getArrayChildren(table, childDim.getColName) }>"
+        case "struct" => s"struct<${ getStructChildren(table, childDim.getColName) }>"
+        case dType => addDecimalScaleAndPrecision(childDim, dType)
+      }
+    }).mkString(",")
+  }
+
+  def getStructChildren(table: CarbonTable, dimName: String): String = {
+    table.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.getName.toLowerCase match {
+        case "array" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:array<${ getArrayChildren(table, childDim.getColName) }>"
+        case "struct" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:struct<${ table.getChildren(childDim.getColName)
+          .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
+        }>"
+        case dType => s"${ childDim.getColName
+          .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+      }
+    }).mkString(",")
+  }
+
+  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
+    var dType = dataType
+    if (CarbonDataTypes.isDecimal(dimval.getDataType)) {
+      dType +=
+      "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
+    }
+    dType
+  }
+
+  private def recursiveMethod(
+      table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
+    childDim.getDataType.getName.toLowerCase match {
+      case "array" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:array<${ getArrayChildren(table, childDim.getColName) }>"
+      case "struct" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:struct<${ getStructChildren(table, childDim.getColName) }>"
+      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
new file mode 100644
index 0000000..59e924d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.streaming.CarbonStreamException
+
+/**
+ * A manager to start and stop a stream job for StreamSQL.
+ * This stream job is only available to the driver memory and not persisted
+ * so other drivers cannot see ongoing stream jobs.
+ */
+object StreamJobManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // map of stream name to job desc
+  private val jobs = new ConcurrentHashMap[String, StreamJobDesc]()
+
+  private def validateSourceTable(source: CarbonTable): Unit = {
+    if (!source.isStreamingSource) {
+      throw new MalformedCarbonCommandException(s"Table ${source.getTableName} is not " +
+                                                "streaming source table " +
+                                                "('streaming' tblproperty is not 'source')")
+    }
+  }
+
+  private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = {
+    if (!sink.isStreamingSink) {
+      throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " +
+                                                "streaming sink table " +
+                                                "('streaming' tblproperty is not 'sink' or 'true')")
+    }
+    val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
+      StructField(column.getColName,
+        CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
+    }
+    if (!querySchema.equals(StructType(fields))) {
+      throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
+                                                s"does not match query output")
+    }
+  }
+
+  /**
+   * Start a spark streaming job
+   * @param sparkSession session instance
+   * @param ifNotExists if not exists is set or not
+   * @param streamName name of the stream
+   * @param sourceTable stream source table
+   * @param sinkTable sink table to insert to
+   * @param query query string
+   * @param streamDf dataframe that containing the query from stream source table
+   * @param options options provided by user
+   * @return Job ID
+   */
+  def startStream(
+      sparkSession: SparkSession,
+      ifNotExists: Boolean,
+      streamName: String,
+      sourceTable: CarbonTable,
+      sinkTable: CarbonTable,
+      query: String,
+      streamDf: DataFrame,
+      options: StreamingOption): String = {
+    val latch = new CountDownLatch(1)
+    var exception: Throwable = null
+    var job: StreamingQuery = null
+
+    if (jobs.containsKey(streamName)) {
+      if (ifNotExists) {
+        return jobs.get(streamName).streamingQuery.id.toString
+      } else {
+        throw new MalformedCarbonCommandException(s"Stream Name $streamName already exists")
+      }
+    }
+
+    validateSourceTable(sourceTable)
+    validateSinkTable(streamDf.schema, sinkTable)
+
+    // start a new thread to run the streaming ingest job, the job will be running
+    // until user stops it by STOP STREAM JOB
+    val thread = new Thread(new Runnable {
+      override def run(): Unit = {
+        try {
+          job = streamDf.writeStream
+            .format("carbondata")
+            .trigger(options.trigger)
+            .option("checkpointLocation", options.checkpointLocation(sinkTable.getTablePath))
+            .option("dateformat", options.dateFormat)
+            .option("timestampformat", options.timeStampFormat)
+            .option("carbon.stream.parser", options.rowParser)
+            .option("dbName", sinkTable.getDatabaseName)
+            .option("tableName", sinkTable.getTableName)
+            .options(options.remainingOption)
+            .start()
+          latch.countDown()
+          job.awaitTermination()
+        } catch {
+          case e: Throwable =>
+            LOGGER.error(e)
+            exception = e
+            latch.countDown()
+        }
+      }
+    })
+    thread.start()
+
+    // wait for max 10 seconds for the streaming job to start
+    if (latch.await(10, TimeUnit.SECONDS)) {
+      if (exception != null) {
+        throw exception
+      }
+
+      jobs.put(
+        streamName,
+        StreamJobDesc(job, streamName, sourceTable.getDatabaseName, sourceTable.getTableName,
+          sinkTable.getDatabaseName, sinkTable.getTableName, query, thread))
+
+      LOGGER.audit(s"STREAM $streamName started with job id '${job.id.toString}', " +
+                   s"from ${sourceTable.getDatabaseName}.${sourceTable.getTableName} " +
+                   s"to ${sinkTable.getDatabaseName}.${sinkTable.getTableName}")
+      job.id.toString
+    } else {
+      thread.interrupt()
+      throw new CarbonStreamException("Streaming job takes too long to start")
+    }
+  }
+
+  /**
+   * Stop a streaming job
+   * @param streamName name of the stream
+   * @param ifExists if exists is set or not
+   */
+  def stopStream(streamName: String, ifExists: Boolean): Unit = {
+    if (jobs.containsKey(streamName)) {
+      val jobDesc = jobs.get(streamName)
+      jobDesc.streamingQuery.stop()
+      jobDesc.thread.interrupt()
+      jobs.remove(streamName)
+      LOGGER.audit(s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " +
+                   s"from ${jobDesc.sourceDb}.${jobDesc.sourceTable} " +
+                   s"to ${jobDesc.sinkDb}.${jobDesc.sinkTable}")
+    } else {
+      if (!ifExists) {
+        throw new NoSuchStreamException(streamName)
+      }
+    }
+  }
+
+  /**
+   * Return all running jobs
+   * @return running jobs
+   */
+  def getAllJobs: Set[StreamJobDesc] = jobs.values.asScala.toSet
+
+}
+
+/**
+ * A job description for the StreamSQL job
+ */
+private[stream] case class StreamJobDesc(
+    streamingQuery: StreamingQuery,
+    streamName: String,
+    sourceDb: String,
+    sourceTable: String,
+    sinkDb: String,
+    sinkTable: String,
+    query: String,
+    thread: Thread,
+    startTime: Long = System.currentTimeMillis()
+)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 0a23d06..0d13d4c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -233,7 +233,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
     val sparkSession = sqlContext.sparkSession
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    if (!carbonTable.isStreamingTable) {
+    if (!carbonTable.isStreamingSink) {
       throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
                                       s"${carbonTable.getTableName} is not a streaming table")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
index 15b038b..cedb381 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
@@ -47,7 +47,7 @@ object CarbonSparkStreamingFactory {
       }
       val carbonTable = CarbonEnv.getCarbonTable(Some(dbName),
         tableName)(spark)
-      if (!carbonTable.isStreamingTable) {
+      if (!carbonTable.isStreamingSink) {
         throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
                                         s"${carbonTable.getTableName} is not a streaming table")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 25589d4..1ae872a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -73,7 +73,7 @@ case class CarbonCreateDataMapCommand(
     }
 
     if (mainTable != null &&
-        mainTable.isStreamingTable &&
+        mainTable.isStreamingSink &&
         !(dmProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString)
           || dmProviderName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString))) {
       throw new MalformedCarbonCommandException(s"Streaming table does not support creating " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
new file mode 100644
index 0000000..d3b178c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * This command will start a Spark streaming job to insert rows from source to sink
+ */
+case class CarbonCreateStreamCommand(
+    streamName: String,
+    sinkDbName: Option[String],
+    sinkTableName: String,
+    ifNotExists: Boolean,
+    optionMap: Map[String, String],
+    query: String
+) extends DataCommand {
+
+  override def output: Seq[Attribute] =
+    Seq(AttributeReference("Stream Name", StringType, nullable = false)(),
+      AttributeReference("JobId", StringType, nullable = false)(),
+      AttributeReference("Status", StringType, nullable = false)())
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val df = sparkSession.sql(query)
+    var sourceTable: CarbonTable = null
+
+    // find the streaming source table in the query
+    // and replace it with StreamingRelation
+    val streamLp = df.logicalPlan transform {
+      case r: LogicalRelation
+        if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
+        val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
+        if (sourceTable != null && sourceTable.getTableName != source.getTableName) {
+          throw new MalformedCarbonCommandException(
+            "Stream query on more than one stream source table is not supported")
+        }
+        sourceTable = source
+        streamingRelation
+      case plan: LogicalPlan => plan
+    }
+
+    if (sourceTable == null) {
+      throw new MalformedCarbonCommandException("Must specify stream source table in the query")
+    }
+
+    // start the streaming job
+    val jobId = StreamJobManager.startStream(
+      sparkSession = sparkSession,
+      ifNotExists = ifNotExists,
+      streamName = streamName,
+      sourceTable = sourceTable,
+      sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
+      query = query,
+      streamDf = Dataset.ofRows(sparkSession, streamLp),
+      options = new StreamingOption(optionMap)
+    )
+    Seq(Row(streamName, jobId, "RUNNING"))
+  }
+
+  private def prepareStreamingRelation(
+      sparkSession: SparkSession,
+      r: LogicalRelation): (CarbonTable, StreamingRelation) = {
+    val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+    val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
+    val format = tblProperty.get("format")
+    if (format == null) {
+      throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
+    }
+    val streamReader = sparkSession.readStream
+      .schema(getSparkSchema(sourceTable))
+      .format(format)
+    val dataFrame = format match {
+      case "csv" | "text" | "json" | "parquet" =>
+        if (!tblProperty.containsKey("path")) {
+          throw new MalformedCarbonCommandException(
+            s"'path' tblproperty should be provided for '$format' format")
+        }
+        streamReader.load(tblProperty.get("path"))
+      case "kafka" | "socket" =>
+        streamReader.load()
+      case other =>
+        throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")
+    }
+    val streamRelation = dataFrame.logicalPlan.asInstanceOf[StreamingRelation]
+
+    // Since SparkSQL analyzer will match the UUID in attribute,
+    // create a new StreamRelation and re-use the same attribute from LogicalRelation
+    (sourceTable,
+      StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output))
+  }
+
+  private def getSparkSchema(sourceTable: CarbonTable): StructType = {
+    val cols = sourceTable.getTableInfo.getFactTable.getListOfColumns.asScala.toArray
+    val sortedCols = cols.filter(_.getSchemaOrdinal != -1)
+      .sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
+    SparkDataTypeConverterImpl.convertToSparkSchema(sourceTable, sortedCols)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
new file mode 100644
index 0000000..82b84ef
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.MetadataCommand
+
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * Stop the stream for specified sink table
+ */
+case class CarbonDropStreamCommand(
+    streamName: String,
+    ifExists: Boolean
+) extends MetadataCommand {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    StreamJobManager.stopStream(streamName, ifExists)
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
new file mode 100644
index 0000000..49c2ffb
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.StringType
+
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * Show all streams created or on a specified table
+ */
+case class CarbonShowStreamsCommand(
+    tableOp: Option[TableIdentifier]
+) extends MetadataCommand {
+  override def output: Seq[Attribute] = {
+    Seq(AttributeReference("Stream Name", StringType, nullable = false)(),
+      AttributeReference("JobId", StringType, nullable = false)(),
+      AttributeReference("Status", StringType, nullable = false)(),
+      AttributeReference("Source", StringType, nullable = false)(),
+      AttributeReference("Sink", StringType, nullable = false)(),
+      AttributeReference("Start Time", StringType, nullable = false)(),
+      AttributeReference("Time Elapse", StringType, nullable = false)())
+  }
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val jobs = tableOp match {
+      case None => StreamJobManager.getAllJobs.toSeq
+      case Some(table) =>
+        val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+        StreamJobManager.getAllJobs.filter { job =>
+          job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
+          job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
+        }.toSeq
+    }
+
+    jobs.map { job =>
+      val elapsedTime = System.currentTimeMillis() - job.startTime
+      Row(
+        job.streamName,
+        job.streamingQuery.id.toString,
+        if (job.streamingQuery.isActive) "RUNNING" else "FAILED",
+        s"${ job.sourceDb }.${ job.sourceTable }",
+        s"${ job.sinkDb }.${ job.sinkTable }",
+        new Date(job.startTime).toString,
+        String.format(
+          "%s days, %s hours, %s min, %s sec",
+          TimeUnit.MILLISECONDS.toDays(elapsedTime).toString,
+          TimeUnit.MILLISECONDS.toHours(elapsedTime).toString,
+          TimeUnit.MILLISECONDS.toMinutes(elapsedTime).toString,
+          TimeUnit.MILLISECONDS.toSeconds(elapsedTime).toString)
+      )
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9576fb1..776750b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -71,7 +71,7 @@ case class CarbonDropTableCommand(
         throw new ConcurrentOperationException(carbonTable, "loading", "drop table")
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      if (carbonTable.isStreamingTable) {
+      if (carbonTable.isStreamingSink) {
         // streaming table should acquire streaming.lock
         carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index df4c742..30db50a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -100,7 +100,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     val updateDeltaMetadata = segmentUpdateStatusManager.readLoadMetadata()
     if (updateDeltaMetadata != null && updateDeltaMetadata.nonEmpty) {
       false
-    } else if (relation.carbonTable.isStreamingTable) {
+    } else if (relation.carbonTable.isStreamingSink) {
       false
     } else {
       true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 468121b..f5c5188 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -274,7 +274,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))
         if (property.isDefined) {
-          if (carbonTable.isStreamingTable) {
+          if (carbonTable.isStreamingSink) {
             throw new MalformedCarbonCommandException(
               "Streaming property can not be changed once it is 'true'")
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index f9c6c5f..f4240e4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -75,7 +75,7 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
     try {
       streaming = CarbonEnv.getCarbonTable(
         tableIdentifier.database, tableIdentifier.table)(sparkSession)
-        .isStreamingTable
+        .isStreamingSink
     } catch {
       case e: Exception =>
         streaming = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 1bb328c..c59246d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -664,7 +664,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       aggregationDataMapSchema: DataMapSchema,
       factAggPlan: LogicalPlan): LogicalPlan = {
     // to handle streaming table with pre aggregate
-    if (carbonTable.isStreamingTable) {
+    if (carbonTable.isStreamingSink) {
       setSegmentsForStreaming(carbonTable, aggregationDataMapSchema)
       // get new fact expression
       val factExp = updateFactTablePlanForStreaming(factAggPlan)
@@ -1399,11 +1399,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             parentTable,
             parentLogicalPlan,
             aggExpColumnMapping.get,
-            parentTable.isStreamingTable)
+            parentTable.isStreamingSink)
         } else {
           Seq(attr)
         }
-        if(!parentTable.isStreamingTable) {
+        if(!parentTable.isStreamingSink) {
           // for normal table
           // generate new expression id for child
           val newExpressionId = NamedExpression.newExprId

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 2f2048d..80d850b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -27,15 +27,13 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.SparkTypeConverter
+import org.apache.spark.util.{CarbonMetastoreTypes, SparkTypeConverter}
 
-import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
@@ -100,7 +98,7 @@ case class CarbonRelation(
     val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
       .asScala
     // convert each column to Attribute
-    columns.filter(!_.isInvisible).map { column =>
+    columns.filter(!_.isInvisible).map { column: CarbonColumn =>
       if (column.isDimension()) {
         val output: DataType = column.getDataType.getName.toLowerCase match {
           case "array" =>
@@ -197,84 +195,3 @@ case class CarbonRelation(
   }
 
 }
-
-object CarbonMetastoreTypes extends RegexParsers {
-  protected lazy val primitiveType: Parser[DataType] =
-    "string" ^^^ StringType |
-    "varchar" ^^^ StringType |
-    "float" ^^^ FloatType |
-    "int" ^^^ IntegerType |
-    "tinyint" ^^^ ShortType |
-    "short" ^^^ ShortType |
-    "double" ^^^ DoubleType |
-    "long" ^^^ LongType |
-    "binary" ^^^ BinaryType |
-    "boolean" ^^^ BooleanType |
-    fixedDecimalType |
-    "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
-    "varchar\\((\\d+)\\)".r ^^^ StringType |
-    "date" ^^^ DateType |
-    "timestamp" ^^^ TimestampType
-
-  protected lazy val fixedDecimalType: Parser[DataType] =
-    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
-      case precision ~ scale =>
-        DecimalType(precision.toInt, scale.toInt)
-    }
-
-  protected lazy val arrayType: Parser[DataType] =
-    "array" ~> "<" ~> dataType <~ ">" ^^ {
-      case tpe => ArrayType(tpe)
-    }
-
-  protected lazy val mapType: Parser[DataType] =
-    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
-      case t1 ~ _ ~ t2 => MapType(t1, t2)
-    }
-
-  protected lazy val structField: Parser[StructField] =
-    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
-      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
-    }
-
-  protected lazy val structType: Parser[DataType] =
-    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
-      case fields => StructType(fields)
-    }
-
-  protected lazy val dataType: Parser[DataType] =
-    arrayType |
-    mapType |
-    structType |
-    primitiveType
-
-  def toDataType(metastoreType: String): DataType = {
-    parseAll(dataType, metastoreType) match {
-      case Success(result, _) => result
-      case _: NoSuccess =>
-        CarbonException.analysisException(s"Unsupported dataType: $metastoreType")
-    }
-  }
-
-  def toMetastoreType(dt: DataType): String = {
-    dt match {
-      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
-      case StructType(fields) =>
-        s"struct<${
-          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
-            .mkString(",")
-        }>"
-      case StringType => "string"
-      case FloatType => "float"
-      case IntegerType => "int"
-      case ShortType => "tinyint"
-      case DoubleType => "double"
-      case LongType => "bigint"
-      case BinaryType => "binary"
-      case BooleanType => "boolean"
-      case DecimalType() => "decimal"
-      case TimestampType => "timestamp"
-      case DateType => "date"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9dd8105..f00fcf8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
@@ -75,7 +76,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   protected lazy val startCommand: Parser[LogicalPlan] =
     loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
-    alterPartition | datamapManagement | alterTableFinishStreaming
+    alterPartition | datamapManagement | alterTableFinishStreaming | stream
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -89,6 +90,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val datamapManagement: Parser[LogicalPlan] =
     createDataMap | dropDataMap | showDataMap | refreshDataMap
 
+  protected lazy val stream: Parser[LogicalPlan] =
+    createStream | dropStream | showStreams
+
   protected lazy val alterAddPartition: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
       "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ {
@@ -146,6 +150,43 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM [IF NOT EXISTS] streamName ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+    CREATE ~> STREAM ~>  opt(IF ~> NOT ~> EXISTS) ~ ident ~
+    (ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~
+    (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+    (AS ~> restInput) <~ opt(";") ^^ {
+      case ifNotExists ~ streamName ~ dbName ~ tableName ~ options ~ query =>
+        val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String]
+        CarbonCreateStreamCommand(
+          streamName, dbName, tableName, ifNotExists.isDefined, optionMap, query)
+    }
+
+  /**
+   * The syntax of DROP STREAM
+   * DROP STREAM [IF EXISTS] streamName
+   */
+  protected lazy val dropStream: Parser[LogicalPlan] =
+    DROP ~> STREAM ~> opt(IF ~> EXISTS) ~ ident <~ opt(";") ^^ {
+      case ifExists ~ streamName =>
+        CarbonDropStreamCommand(streamName, ifExists.isDefined)
+    }
+
+  /**
+   * The syntax of SHOW STREAMS
+   * SHOW STREAMS [ON TABLE dbName.tableName]
+   */
+  protected lazy val showStreams: Parser[LogicalPlan] =
+    SHOW ~> STREAMS ~> opt(ontable) <~ opt(";") ^^ {
+      case tableIdent =>
+        CarbonShowStreamsCommand(tableIdent)
+    }
+
+  /**
    * The syntax of datamap creation is as follows.
    * CREATE DATAMAP IF NOT EXISTS datamapName [ON TABLE tableName]
    * USING 'DataMapProviderName'
@@ -160,7 +201,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
     (AS ~> restInput).? <~ opt(";") ^^ {
       case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ deferred ~ dmprops ~ query =>
-
         val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String]
         CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, query,
           ifnotexists.isDefined, deferred.isDefined)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index b7b28b8..066819e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -120,8 +120,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
 
 
   def needToConvertToLowerCase(key: String): Boolean = {
-    val noConvertList = Array("LIST_INFO", "RANGE_INFO")
-    !noConvertList.exists(x => x.equalsIgnoreCase(key));
+    val noConvertList = Array("LIST_INFO", "RANGE_INFO", "PATH")
+    !noConvertList.exists(x => x.equalsIgnoreCase(key))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
deleted file mode 100644
index 65210b8..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.Objects
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.hive.CarbonMetastoreTypes
-import org.apache.spark.sql.types
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
-
-private[spark] object SparkTypeConverter {
-
-  def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = {
-    Objects.requireNonNull(table)
-    Objects.requireNonNull(columns)
-    if (columns.isEmpty) {
-      throw new IllegalArgumentException("column list is empty")
-    }
-    val fields = new java.util.ArrayList[StructField](columns.size)
-    val allColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
-
-    // find the column and add it to fields array
-    columns.foreach { column =>
-      val col = allColumns.find(_.getColumnName.equalsIgnoreCase(column)).getOrElse(
-        throw new IllegalArgumentException(column + " does not exist")
-      )
-      fields.add(StructField(col.getColumnName, convertCarbonToSparkDataType(col, table)))
-    }
-    StructType(fields)
-  }
-
-  /**
-   * Converts from carbon datatype to corresponding spark datatype.
-   */
-  def convertCarbonToSparkDataType(
-      columnSchema: ColumnSchema,
-      table: CarbonTable): types.DataType = {
-    if (CarbonDataTypes.isDecimal(columnSchema.getDataType)) {
-      val scale = columnSchema.getScale
-      val precision = columnSchema.getPrecision
-      if (scale == 0 && precision == 0) {
-        DecimalType(18, 2)
-      } else {
-        DecimalType(precision, scale)
-      }
-    } else if (CarbonDataTypes.isArrayType(columnSchema.getDataType)) {
-      CarbonMetastoreTypes
-        .toDataType(s"array<${ getArrayChildren(table, columnSchema.getColumnName) }>")
-    } else if (CarbonDataTypes.isStructType(columnSchema.getDataType)) {
-      CarbonMetastoreTypes
-        .toDataType(s"struct<${ getStructChildren(table, columnSchema.getColumnName) }>")
-    } else {
-      columnSchema.getDataType match {
-        case CarbonDataTypes.STRING => StringType
-        case CarbonDataTypes.SHORT => ShortType
-        case CarbonDataTypes.INT => IntegerType
-        case CarbonDataTypes.LONG => LongType
-        case CarbonDataTypes.DOUBLE => DoubleType
-        case CarbonDataTypes.BOOLEAN => BooleanType
-        case CarbonDataTypes.TIMESTAMP => TimestampType
-        case CarbonDataTypes.DATE => DateType
-      }
-    }
-  }
-
-  def getArrayChildren(table: CarbonTable, dimName: String): String = {
-    table.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"array<${ getArrayChildren(table, childDim.getColName) }>"
-        case "struct" => s"struct<${ getStructChildren(table, childDim.getColName) }>"
-        case dType => addDecimalScaleAndPrecision(childDim, dType)
-      }
-    }).mkString(",")
-  }
-
-  def getStructChildren(table: CarbonTable, dimName: String): String = {
-    table.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(table, childDim.getColName) }>"
-        case "struct" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:struct<${ table.getChildren(childDim.getColName)
-          .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
-        }>"
-        case dType => s"${ childDim.getColName
-          .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
-      }
-    }).mkString(",")
-  }
-
-  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
-    var dType = dataType
-    if (CarbonDataTypes.isDecimal(dimval.getDataType)) {
-      dType +=
-      "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
-    }
-    dType
-  }
-
-  private def recursiveMethod(
-      table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
-    childDim.getDataType.getName.toLowerCase match {
-      case "array" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:array<${ getArrayChildren(table, childDim.getColName) }>"
-      case "struct" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:struct<${ getStructChildren(table, childDim.getColName) }>"
-      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ea3b2dc/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 3253c3d..9d9a9f5 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -25,13 +25,15 @@ import java.util.concurrent.Executors
 
 import scala.collection.mutable
 
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
@@ -120,7 +122,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     createTable(tableName = "agg_table", streaming = true, withBatchLoad = false)
 
-    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
     generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
     generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
   }
@@ -188,6 +190,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     dropTable()
     sql("USE default")
     sql("DROP DATABASE IF EXISTS streaming CASCADE")
+    var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
+    new File(csvDataDir).delete()
+    csvDataDir = integrationPath + "/spark2/target/csvdata"
+    new File(csvDataDir).delete()
   }
 
   def dropTable(): Unit = {
@@ -362,12 +368,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("alter table agg_table2 compact 'streaming'")
     // Data should be loaded into aggregate table as hand-off is fired
     checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"),
-        Seq(
-          Row("name_10", 400000.0),
-          Row("name_14", 560000.0),
-          Row("name_12", 480000.0),
-          Row("name_11", 440000.0),
-          Row("name_13", 520000.0)))
+      Seq(
+        Row("name_10", 400000.0),
+        Row("name_14", 560000.0),
+        Row("name_12", 480000.0),
+        Row("name_11", 440000.0),
+        Row("name_13", 520000.0)))
     checkAnswer(sql("select * from agg_table2_p1"),
       Seq(
         Row("name_10", 200000.0),
@@ -1430,7 +1436,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
     val table1 =
       CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
-    assertResult(true)(table1.isStreamingTable)
+    assertResult(true)(table1.isStreamingSink)
 
     sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
 
@@ -1447,13 +1453,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     val table2 =
       CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
-    assertResult(false)(table2.isStreamingTable)
+    assertResult(false)(table2.isStreamingSink)
 
     sql("ALTER TABLE streaming.stream_table_reopen SET TBLPROPERTIES('streaming'='true')")
 
     val table3 =
       CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
-    assertResult(true)(table3.isStreamingTable)
+    assertResult(true)(table3.isStreamingSink)
 
     executeStreamingIngest(
       tableName = "stream_table_reopen",
@@ -1569,8 +1575,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
 
   test("test bad_record_action IGNORE on streaming table") {
-
-sql("drop table if exists streaming.bad_record_ignore")
+    sql("drop table if exists streaming.bad_record_ignore")
     sql(
       s"""
          | CREATE TABLE streaming.bad_record_ignore(
@@ -1628,6 +1633,334 @@ sql("drop table if exists streaming.bad_record_ignore")
     checkAnswer(sql("select count(*) from streaming.bad_record_redirect"), Seq(Row(19)))
   }
 
+  test("StreamSQL: create and drop a stream") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+
+    var rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    val csvDataDir = integrationPath + "/spark2/target/streamSql"
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+
+    sql(
+      s"""
+         |CREATE TABLE source(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         | 'streaming'='source',
+         | 'format'='csv',
+         | 'path'='$csvDataDir'
+         |)
+      """.stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='sink')
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE STREAM stream123 ON TABLE sink
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='1 seconds')
+        |AS
+        |  SELECT *
+        |  FROM source
+        |  WHERE id % 2 = 1
+      """.stripMargin).show(false)
+    sql(
+      """
+        |CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='1 seconds')
+        |AS
+        |  SELECT *
+        |  FROM source
+        |  WHERE id % 2 = 1
+      """.stripMargin).show(false)
+    Thread.sleep(200)
+    sql("select * from sink").show
+
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
+    Thread.sleep(5000)
+
+    // after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
+    checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))
+
+    val row = sql("select * from sink order by id").head()
+    val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
+    assertResult(exceptedRow)(row)
+
+    sql("SHOW STREAMS").show(false)
+
+    rows = sql("SHOW STREAMS").collect()
+    assertResult(1)(rows.length)
+    assertResult("stream123")(rows.head.getString(0))
+    assertResult("RUNNING")(rows.head.getString(2))
+    assertResult("streaming.source")(rows.head.getString(3))
+    assertResult("streaming.sink")(rows.head.getString(4))
+
+    rows = sql("SHOW STREAMS ON TABLE sink").collect()
+    assertResult(1)(rows.length)
+    assertResult("stream123")(rows.head.getString(0))
+    assertResult("RUNNING")(rows.head.getString(2))
+    assertResult("streaming.source")(rows.head.getString(3))
+    assertResult("streaming.sink")(rows.head.getString(4))
+
+    sql("DROP STREAM stream123")
+    sql("DROP STREAM IF EXISTS stream123")
+
+    rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+  }
+
+  test("StreamSQL: create stream without interval ") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+
+    val csvDataDir = integrationPath + "/spark2/target/streamsql"
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+
+    sql(
+      s"""
+         |CREATE TABLE source(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         | 'streaming'='source',
+         | 'format'='csv',
+         | 'path'='$csvDataDir'
+         |)
+      """.stripMargin)
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='sink')
+      """.stripMargin)
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+          |CREATE STREAM stream456 ON TABLE sink
+          |STMPROPERTIES(
+          |  'trigger'='ProcessingTime')
+          |AS
+          |  SELECT *
+          |  FROM source
+          |  WHERE id % 2 = 1
+        """.stripMargin)
+    }
+    assert(ex.getMessage.contains("interval must be specified"))
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+  }
+
+  test("StreamSQL: create stream on non exist stream source table") {
+    sql("DROP TABLE IF EXISTS sink")
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='true')
+      """.stripMargin)
+
+    val ex = intercept[AnalysisException] {
+      sql(
+        """
+          |CREATE STREAM stream123 ON TABLE sink
+          |STMPROPERTIES(
+          |  'trigger'='ProcessingTime',
+          |  'interval'='1 seconds')
+          |AS
+          |  SELECT *
+          |  FROM source
+          |  WHERE id % 2 = 1
+        """.stripMargin).show(false)
+    }
+    sql("DROP TABLE IF EXISTS sink")
+  }
+
+  test("StreamSQL: create stream source using carbon file") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+
+    sql(
+      s"""
+         |CREATE TABLE source(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         | 'streaming'='source'
+         |)
+      """.stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='sink')
+      """.stripMargin)
+
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+          |CREATE STREAM stream123 ON TABLE sink
+          |STMPROPERTIES(
+          |  'trigger'='ProcessingTime',
+          |  'interval'='1 seconds')
+          |AS
+          |  SELECT *
+          |  FROM source
+          |  WHERE id % 2 = 1
+        """.stripMargin)
+    }
+    assert(ex.getMessage.contains("Streaming from carbon file is not supported"))
+
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+  }
+
+  test("StreamSQL: start stream on non-stream table") {
+    sql(
+      s"""
+         |CREATE TABLE notsource(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='true')
+      """.stripMargin)
+
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+          |CREATE STREAM stream456 ON TABLE sink
+          |STMPROPERTIES(
+          |  'trigger'='ProcessingTime',
+          |  'interval'='1 seconds')
+          |AS
+          |  SELECT *
+          |  FROM notsource
+          |  WHERE id % 2 = 1
+        """.stripMargin).show(false)
+    }
+    assert(ex.getMessage.contains("Must specify stream source table in the query"))
+    sql("DROP TABLE sink")
+  }
+
+  test("StreamSQL: drop stream on non exist table") {
+    val ex = intercept[NoSuchStreamException] {
+      sql("DROP STREAM streamyyy")
+    }
+    assert(ex.getMessage.contains("stream 'streamyyy' not found"))
+  }
+
+  test("StreamSQL: show streams on non-exist table") {
+    val ex = intercept[NoSuchTableException] {
+      sql("SHOW STREAMS ON TABLE ddd")
+    }
+    assert(ex.getMessage.contains("'ddd' not found"))
+  }
 
   def createWriteSocketThread(
       serverSocket: ServerSocket,