You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/09/13 08:44:19 UTC

carbondata git commit: [CARBONDATA-2919] Support ingest from Kafka in StreamSQL

Repository: carbondata
Updated Branches:
  refs/heads/master 4c692d185 -> 560bfbe77


[CARBONDATA-2919] Support ingest from Kafka in StreamSQL

This closes #2695


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/560bfbe7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/560bfbe7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/560bfbe7

Branch: refs/heads/master
Commit: 560bfbe778a29bad285a6388686d888a3e877e33
Parents: 4c692d1
Author: Jacky Li <ja...@qq.com>
Authored: Tue Sep 11 12:17:55 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Thu Sep 13 16:42:20 2018 +0800

----------------------------------------------------------------------
 .../core/locks/AbstractCarbonLock.java          |  10 +
 .../carbondata/core/locks/HdfsFileLock.java     |   4 -
 .../carbondata/core/locks/ICarbonLock.java      |   5 +
 .../carbondata/core/locks/LocalFileLock.java    |   5 -
 .../carbondata/core/locks/S3FileLock.java       |   4 -
 .../core/metadata/schema/table/CarbonTable.java |   8 +
 .../carbondata/core/util/SessionParams.java     |   2 +
 datamap/mv/plan/pom.xml                         |   6 +
 examples/spark2/pom.xml                         |   4 +
 .../carbondata/examples/StreamSQLExample.scala  | 107 ++++++++++
 .../examples/StructuredStreamingExample.scala   |   1 +
 .../org/apache/carbondata/spark/util/Util.java  |  11 +
 .../org/apache/carbondata/api/CarbonStore.scala |  43 ++--
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   2 +
 .../streaming/CarbonAppendableStreamSink.scala  |   1 +
 .../datasources/CarbonSparkDataSourceUtil.scala |  16 +-
 integration/spark2/pom.xml                      |   6 +
 .../carbondata/spark/util/CarbonSparkUtil.scala |  10 +
 .../carbondata/stream/StreamJobManager.scala    |  43 ++--
 .../CarbonAlterTableCompactionCommand.scala     |   5 +-
 .../management/CarbonShowLoadsCommand.scala     |  10 +-
 .../stream/CarbonCreateStreamCommand.scala      | 213 ++++++++++++++++---
 .../TestStreamingTableOperation.scala           |  21 +-
 pom.xml                                         |   6 +
 .../loading/parser/impl/RowParserImpl.java      |   3 +
 .../streaming/segment/StreamSegment.java        |   2 +-
 26 files changed, 451 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
index 4aa0a18..37f77b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
@@ -30,8 +30,18 @@ public abstract class AbstractCarbonLock implements ICarbonLock {
 
   private int retryTimeout;
 
+  /**
+   * lockFilePath is the location of the lock file.
+   */
+  protected String lockFilePath;
+
   public abstract boolean lock();
 
+  @Override
+  public String getLockFilePath() {
+    return this.lockFilePath;
+  }
+
   /**
    * API for enabling the locking of file with retries.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index ade4212..bc65ece 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -33,10 +33,6 @@ public class HdfsFileLock extends AbstractCarbonLock {
 
   private static final LogService LOGGER =
              LogServiceFactory.getLogService(HdfsFileLock.class.getName());
-  /**
-   * lockFilePath is the location of the lock file.
-   */
-  private String lockFilePath;
 
   /**
    * lockFileDir is the directory of the lock file.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
index e964f0c..ab20a5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
@@ -51,4 +51,9 @@ public interface ICarbonLock {
    */
   boolean releaseLockManually(String lockFile);
 
+  /**
+   * Return the path to the lock file
+   * @return lock file path
+   */
+  String getLockFilePath();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 1148ae2..5e3033e 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -36,11 +36,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
  */
 public class LocalFileLock extends AbstractCarbonLock {
   /**
-   * lockFilePath is the location of the lock file.
-   */
-  private String lockFilePath;
-
-  /**
    * lockFileDir is the directory of the lock file.
    */
   private String lockFileDir;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
index 464becb..10bab28 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
@@ -34,10 +34,6 @@ public class S3FileLock extends AbstractCarbonLock {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(S3FileLock.class.getName());
-  /**
-   * lockFilePath is the location of the lock file.
-   */
-  private String lockFilePath;
 
   /**
    * lockFileDir is the directory of the lock file.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 21f24d6..c606063 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1186,6 +1186,14 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * Return the format value defined in table properties
+   * @return String as per table properties, null if not defined
+   */
+  public String getFormat() {
+    return getTableInfo().getFactTable().getTableProperties().get("format");
+  }
+
+  /**
    * Method to get the list of cached columns of the table.
    * This method need to be used for Describe formatted like scenario where columns need to be
    * displayed in the column create order

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 2abf0e1..931e106 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -36,6 +36,7 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
@@ -162,6 +163,7 @@ public class SessionParams implements Serializable, Cloneable {
       case CARBON_SEARCH_MODE_ENABLE:
       case ENABLE_VECTOR_READER:
       case ENABLE_UNSAFE_IN_QUERY_EXECUTION:
+      case ENABLE_AUTO_LOAD_MERGE:
         isValid = CarbonUtil.validateBoolean(value);
         if (!isValid) {
           throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/datamap/mv/plan/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml
index ff6976d..982724d 100644
--- a/datamap/mv/plan/pom.xml
+++ b/datamap/mv/plan/pom.xml
@@ -48,6 +48,12 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 91cb20f..bd497c5 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -55,6 +55,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
new file mode 100644
index 0000000..58f51bd
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.net.ServerSocket
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+// scalastyle:off println
+object StreamSQLExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS sink")
+      spark.sql("DROP TABLE IF EXISTS source")
+
+      // Create target carbon table and populate with initial data
+      spark.sql(
+        s"""
+           | CREATE TABLE sink(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT,
+           | file struct<school:array<string>, age:int>
+           | )
+           | STORED AS carbondata
+           | TBLPROPERTIES(
+           | 'streaming'='true', 'sort_columns'='')
+          """.stripMargin)
+    }
+
+    spark.sql(
+      """
+        | CREATE TABLE source (
+        | id INT,
+        | name STRING,
+        | city STRING,
+        | salary FLOAT,
+        | file struct<school:array<string>, age:int>
+        | )
+        | STORED AS carbondata
+        | TBLPROPERTIES(
+        | 'streaming'='source',
+        | 'format'='socket',
+        | 'host'='localhost',
+        | 'port'='7071')
+      """.stripMargin)
+
+    val serverSocket = new ServerSocket(7071)
+
+    // start ingest streaming job
+    spark.sql(
+      s"""
+        | CREATE STREAM ingest ON TABLE sink
+        | STMPROPERTIES(
+        | 'trigger' = 'ProcessingTime',
+        | 'interval' = '3 seconds')
+        | AS SELECT * FROM source
+      """.stripMargin)
+
+    // start writing data into the socket
+    import StructuredStreamingExample.{showTableCount, writeSocket}
+    val thread1 = writeSocket(serverSocket)
+    val thread2 = showTableCount(spark, "sink")
+
+    System.out.println("type enter to interrupt streaming")
+    System.in.read()
+    thread1.interrupt()
+    thread2.interrupt()
+    serverSocket.close()
+
+    // stop streaming job
+    spark.sql("DROP STREAM ingest").show
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+}
+
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
index f88d8ee..31de668 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -130,6 +130,7 @@ object StructuredStreamingExample {
       override def run(): Unit = {
         for (_ <- 0 to 1000) {
           spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"show segments for table $tableName").show
           Thread.sleep(1000 * 3)
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index b4c8250..832a1b2 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
@@ -83,6 +84,16 @@ public class Util {
     }
   }
 
+  public static StructType convertToSparkSchema(CarbonTable table) {
+    List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+    ColumnSchema[] schema = new ColumnSchema[columns.size()];
+    int i = 0;
+    for (CarbonColumn column : columns) {
+      schema[i++] = column.getColumnSchema();
+    }
+    return convertToSparkSchema(table, schema);
+  }
+
   public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) {
     List<StructField> fields = new ArrayList<>(carbonColumns.length);
     for (int i = 0; i < carbonColumns.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 02c8607..3864b5d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -37,21 +37,23 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.segment.StreamSegment
 
 object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def showSegments(
       limit: Option[String],
-      tableFolderPath: String,
+      tablePath: String,
       showHistory: Boolean): Seq[Row] = {
+    val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
     val loadMetadataDetailsArray = if (showHistory) {
-      SegmentStatusManager.readLoadMetadata(tableFolderPath) ++
-      SegmentStatusManager.readLoadHistoryMetadata(tableFolderPath)
+      SegmentStatusManager.readLoadMetadata(metaFolder) ++
+      SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
     } else {
-      SegmentStatusManager.readLoadMetadata(tableFolderPath)
+      SegmentStatusManager.readLoadMetadata(metaFolder)
     }
 
     if (loadMetadataDetailsArray.nonEmpty) {
@@ -84,18 +86,31 @@ object CarbonStore {
 
           val startTime =
             if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
-              null
+              "NA"
             } else {
-              new java.sql.Timestamp(load.getLoadStartTime)
+              new java.sql.Timestamp(load.getLoadStartTime).toString
             }
 
           val endTime =
             if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
-              null
+              "NA"
             } else {
-              new java.sql.Timestamp(load.getLoadEndTime)
+              new java.sql.Timestamp(load.getLoadEndTime).toString
             }
 
+          val (dataSize, indexSize) = if (load.getFileFormat == FileFormat.ROW_V1) {
+            // for streaming segment, we should get the actual size from the index file
+            // since it is continuously inserting data
+            val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName)
+            val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
+            val indices = StreamSegment.readIndexFile(indexPath, FileFactory.getFileType(indexPath))
+            (indices.asScala.map(_.getFile_size).sum, FileFactory.getCarbonFile(indexPath).getSize)
+          } else {
+            // for batch segment, we can get the data size from table status file directly
+            (if (load.getDataSize == null) 0L else load.getDataSize.toLong,
+              if (load.getIndexSize == null) 0L else load.getIndexSize.toLong)
+          }
+
           if (showHistory) {
             Row(
               load.getLoadName,
@@ -104,9 +119,9 @@ object CarbonStore {
               endTime,
               mergedTo,
               load.getFileFormat.toString,
-              load.getVisibility(),
-              Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat),
-              Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat))
+              load.getVisibility,
+              Strings.formatSize(dataSize.toFloat),
+              Strings.formatSize(indexSize.toFloat))
           } else {
             Row(
               load.getLoadName,
@@ -115,8 +130,8 @@ object CarbonStore {
               endTime,
               mergedTo,
               load.getFileFormat.toString,
-              Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat),
-              Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat))
+              Strings.formatSize(dataSize.toFloat),
+              Strings.formatSize(indexSize.toFloat))
           }
         }.toSeq
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index ab6e320..57b2e44 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -410,6 +410,8 @@ object StreamHandoffRDD {
         } else {
           newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
           newSegment.get.setLoadEndTime(System.currentTimeMillis())
+          CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newSegment.get, loadModel.getSegmentId,
+            loadModel.getCarbonDataLoadSchema.getCarbonTable)
         }
 
         // update streaming segment to compacted status

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 5762906..6d93b34 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.rdd.StreamHandoffRDD

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 00a5139..73c07b4 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.datatype.{ArrayType => CarbonArrayType, DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, MapType => CarbonMapType, StructField => CarbonStructField, StructType => CarbonStructType}
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -39,6 +39,20 @@ object CarbonSparkDataSourceUtil {
       DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
         dataType.asInstanceOf[CarbonDecimalType].getScale)
     } else {
+      if (CarbonDataTypes.isStructType(dataType)) {
+        val struct = dataType.asInstanceOf[CarbonStructType]
+        return StructType(struct.getFields.asScala.map(x =>
+          StructField(x.getFieldName, convertCarbonToSparkDataType(x.getDataType)))
+        )
+      } else if (CarbonDataTypes.isArrayType(dataType)) {
+        val array = dataType.asInstanceOf[CarbonArrayType]
+        return ArrayType(convertCarbonToSparkDataType(array.getElementType))
+      } else if (CarbonDataTypes.isMapType(dataType)) {
+        val map = dataType.asInstanceOf[CarbonMapType]
+        return MapType(
+          convertCarbonToSparkDataType(map.getKeyType),
+          convertCarbonToSparkDataType(map.getValueType))
+      }
       dataType match {
         case CarbonDataTypes.STRING => StringType
         case CarbonDataTypes.SHORT => ShortType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 5af4fbe..8c8fd28 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -109,6 +109,12 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>${spark.deps.scope}</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index a0c0545..b2687d0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -107,4 +107,14 @@ object CarbonSparkUtil {
     })
     fields.mkString(",")
   }
+
+  /**
+   * add escape prefix for delimiter
+   */
+  def delimiterConverter4Udf(delimiter: String): String = delimiter match {
+    case "|" | "*" | "." | ":" | "^" | "\\" | "$" | "+" | "?" | "(" | ")" | "{" | "}" | "[" | "]" =>
+      "\\\\" + delimiter
+    case _ =>
+      delimiter
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index f0d7bf5..23323d4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, SparkSession}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -29,7 +29,9 @@ import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.carbondata.common.exceptions.NoSuchStreamException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.spark.StreamingOption
 import org.apache.carbondata.streaming.CarbonStreamException
 
@@ -46,25 +48,29 @@ object StreamJobManager {
 
   private def validateSourceTable(source: CarbonTable): Unit = {
     if (!source.isStreamingSource) {
-      throw new MalformedCarbonCommandException(s"Table ${source.getTableName} is not " +
-                                                "streaming source table " +
-                                                "('streaming' tblproperty is not 'source')")
+      throw new MalformedCarbonCommandException(
+        s"Table ${source.getTableName} is not streaming source table " +
+        "('streaming' tblproperty is not 'source')")
     }
   }
 
-  private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = {
+  private def validateSinkTable(validateQuerySchema: Boolean,
+                                querySchema: StructType, sink: CarbonTable): Unit = {
     if (!sink.isStreamingSink) {
-      throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " +
-                                                "streaming sink table " +
-                                                "('streaming' tblproperty is not 'sink' or 'true')")
+      throw new MalformedCarbonCommandException(
+        s"Table ${sink.getTableName} is not streaming sink table " +
+        "('streaming' tblproperty is not 'sink' or 'true')")
     }
-    val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
-      StructField(column.getColName,
-        CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
-    }
-    if (!querySchema.equals(StructType(fields))) {
-      throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
-                                                s"does not match query output")
+    if (validateQuerySchema) {
+      val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
+        StructField(
+          column.getColName,
+          CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
+      }
+      if (!querySchema.equals(StructType(fields))) {
+        throw new MalformedCarbonCommandException(
+          s"Schema of table ${sink.getTableName} does not match query output")
+      }
     }
   }
 
@@ -102,7 +108,12 @@ object StreamJobManager {
     }
 
     validateSourceTable(sourceTable)
-    validateSinkTable(streamDf.schema, sinkTable)
+
+    // for kafka and socket stream source, the source table schema is one string column only
+    // so we do not validate the query schema against the sink table schema
+    val isKafka = Option(sourceTable.getFormat).contains("kafka")
+    val isSocket = Option(sourceTable.getFormat).contains("socket")
+    validateSinkTable(!(isKafka || isSocket), streamDf.schema, sinkTable)
 
     // start a new thread to run the streaming ingest job, the job will be running
     // until user stops it by STOP STREAM JOB

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a13dfdc..8b6dabd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -330,10 +330,13 @@ case class CarbonAlterTableCompactionCommand(
   ): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    // 1. acquire lock of streaming.lock
+    // 1. delete the lock of streaming.lock, forcing the stream to be closed
     val streamingLock = CarbonLockFactory.getCarbonLockObj(
       carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier,
       LockUsage.STREAMING_LOCK)
+    if (!FileFactory.getCarbonFile(streamingLock.getLockFilePath).delete()) {
+       LOGGER.warn("failed to delete lock file: " + streamingLock.getLockFilePath)
+    }
     try {
       if (streamingLock.lockWithRetries()) {
         // 2. convert segment status from "streaming" to "streaming finish"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 3d57a99..3f68cc4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -37,8 +37,8 @@ case class CarbonShowLoadsCommand(
     if (showHistory) {
       Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
         AttributeReference("Status", StringType, nullable = false)(),
-        AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-        AttributeReference("Load End Time", TimestampType, nullable = true)(),
+        AttributeReference("Load Start Time", StringType, nullable = false)(),
+        AttributeReference("Load End Time", StringType, nullable = true)(),
         AttributeReference("Merged To", StringType, nullable = false)(),
         AttributeReference("File Format", StringType, nullable = false)(),
         AttributeReference("Visibility", StringType, nullable = false)(),
@@ -47,8 +47,8 @@ case class CarbonShowLoadsCommand(
     } else {
       Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
         AttributeReference("Status", StringType, nullable = false)(),
-        AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-        AttributeReference("Load End Time", TimestampType, nullable = true)(),
+        AttributeReference("Load Start Time", StringType, nullable = false)(),
+        AttributeReference("Load End Time", StringType, nullable = true)(),
         AttributeReference("Merged To", StringType, nullable = false)(),
         AttributeReference("File Format", StringType, nullable = false)(),
         AttributeReference("Data Size", StringType, nullable = false)(),
@@ -64,7 +64,7 @@ case class CarbonShowLoadsCommand(
     }
     CarbonStore.showSegments(
       limit,
-      carbonTable.getMetadataPath,
+      carbonTable.getTablePath,
       showHistory
     )
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index 1e1ab44..94e063b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.execution.command.stream
 
+import java.util
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.DataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.streaming.StreamingRelation
@@ -29,8 +32,9 @@ import org.apache.spark.sql.types.{StringType, StructType}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.spark.StreamingOption
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, Util}
 import org.apache.carbondata.stream.StreamJobManager
 
 /**
@@ -51,29 +55,57 @@ case class CarbonCreateStreamCommand(
       AttributeReference("Status", StringType, nullable = false)())
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val df = sparkSession.sql(query)
-    var sourceTable: CarbonTable = null
-
-    // find the streaming source table in the query
-    // and replace it with StreamingRelation
-    val streamLp = df.logicalPlan transform {
+    val inputQuery = sparkSession.sql(query)
+    val sourceTableSeq = inputQuery.logicalPlan collect {
       case r: LogicalRelation
         if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
-        val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
-        if (sourceTable != null && sourceTable.getTableName != source.getTableName) {
-          throw new MalformedCarbonCommandException(
-            "Stream query on more than one stream source table is not supported")
-        }
-        sourceTable = source
-        streamingRelation
-      case plan: LogicalPlan => plan
+        r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+    }
+    if (sourceTableSeq.isEmpty) {
+      throw new MalformedCarbonCommandException(
+        "Must specify stream source table in the stream query")
+    }
+    if (sourceTableSeq.size > 1) {
+      throw new MalformedCarbonCommandException(
+        "Stream query on more than one stream source table is not supported")
+    }
+    val sourceTable = sourceTableSeq.head
+
+    val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
+    val format = sourceTable.getFormat
+    if (format == null) {
+      throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
+    }
+    val updatedQuery = if (format.equals("kafka")) {
+      shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable)
+      shouldHaveProperty(tblProperty, "subscribe", sourceTable)
+      createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty)
+    } else if (format.equals("socket")) {
+      shouldHaveProperty(tblProperty, "host", sourceTable)
+      shouldHaveProperty(tblProperty, "port", sourceTable)
+      createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty)
+    } else {
+      // Replace the logical relation with a streaming relation created
+      // from the stream source table
+      inputQuery.logicalPlan transform {
+        case r: LogicalRelation
+          if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+             r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
+        => prepareStreamingRelation(sparkSession, r)
+        case plan: LogicalPlan => plan
+      }
     }
 
     if (sourceTable == null) {
       throw new MalformedCarbonCommandException("Must specify stream source table in the query")
     }
 
+    // add CSV row parser if user does not specify
+    val newMap = mutable.Map[String, String]()
+    optionMap.foreach(x => newMap(x._1) = x._2)
+    newMap(CSVInputFormat.DELIMITER) = tblProperty.asScala.getOrElse("delimiter", ",")
+
     // start the streaming job
     val jobId = StreamJobManager.startStream(
       sparkSession = sparkSession,
@@ -82,33 +114,140 @@ case class CarbonCreateStreamCommand(
       sourceTable = sourceTable,
       sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
       query = query,
-      streamDf = Dataset.ofRows(sparkSession, streamLp),
-      options = new StreamingOption(optionMap)
+      streamDf = Dataset.ofRows(sparkSession, updatedQuery),
+      options = new StreamingOption(newMap.toMap)
     )
     Seq(Row(streamName, jobId, "RUNNING"))
   }
 
+  /**
+   * Create a new plan for the stream query on kafka and Socket source table.
+   * This is required because we need to convert the schema of the data stored in kafka
+   * The returned logical plan contains the complete plan tree of original plan with
+   * logical relation replaced with a streaming relation.
+   *
+   * @param sparkSession spark session
+   * @param inputQuery stream query from user
+   * @param sourceTable source table (kafka table)
+   * @param sourceName source name, kafka or socket
+   * @param tblProperty table property of source table
+   * @return a new logical plan
+   */
+  private def createPlan(
+      sparkSession: SparkSession,
+      inputQuery: DataFrame,
+      sourceTable: CarbonTable,
+      sourceName: String,
+      tblProperty: util.Map[String, String]): LogicalPlan = {
+    // We follow 3 steps to generate new plan
+    // 1. replace the logical relation in stream query with streaming relation
+    // 2. collect the new ExprId generated
+    // 3. update the stream query plan with the new ExprId generated, to make the plan consistent
+
+    // exprList is used for UDF to extract the data from the 'value' column in kafka
+    val columnNames = Util.convertToSparkSchema(sourceTable).fieldNames
+    val exprList = columnNames.zipWithIndex.map {
+      case (columnName, i) =>
+        s"case when size(_values) > $i then _values[$i] else null end AS $columnName"
+    }
+
+    val delimiter = tblProperty.asScala.getOrElse("delimiter", ",")
+    val aliasMap = new util.HashMap[String, ExprId]()
+    val updatedQuery = inputQuery.logicalPlan transform {
+      case r: LogicalRelation
+        if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
+        // for kafka stream source, get the 'value' column and split it by using UDF
+        val kafkaPlan = sparkSession.readStream
+          .format(sourceName)
+          .options(tblProperty)
+          .load()
+          .selectExpr("CAST(value as string) as _value")
+          .selectExpr(
+            s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
+          .selectExpr(exprList: _*)
+          .logicalPlan
+
+        // collect the newly generated ExprId
+        kafkaPlan collect {
+          case p@Project(projectList, child) =>
+            projectList.map { expr =>
+              aliasMap.put(expr.name, expr.exprId)
+            }
+            p
+        }
+        kafkaPlan
+      case plan: LogicalPlan => plan
+    }
+
+    // transform the stream plan to replace all attribute with the collected ExprId
+    val transFormedPlan = updatedQuery transform {
+      case p@Project(projectList: Seq[NamedExpression], child) =>
+        val newProjectList = projectList.map { expr =>
+          val newExpr = expr transform {
+            case attribute: Attribute =>
+              val exprId: ExprId = aliasMap.get(attribute.name)
+              if (exprId != null) {
+                if (exprId.id != attribute.exprId.id) {
+                  AttributeReference(
+                    attribute.name, attribute.dataType, attribute.nullable,
+                    attribute.metadata)(exprId, attribute.qualifier)
+                } else {
+                  attribute
+                }
+              } else {
+                attribute
+              }
+          }
+          newExpr.asInstanceOf[NamedExpression]
+        }
+        Project(newProjectList, child)
+      case f@Filter(condition: Expression, child) =>
+        val newCondition = condition transform {
+          case attribute: Attribute =>
+            val exprId: ExprId = aliasMap.get(attribute.name)
+            if (exprId != null) {
+              if (exprId.id != attribute.exprId.id) {
+                AttributeReference(
+                  attribute.name, attribute.dataType, attribute.nullable,
+                  attribute.metadata)(exprId, attribute.qualifier)
+              } else {
+                attribute
+              }
+            } else {
+              attribute
+            }
+        }
+        Filter(newCondition, child)
+    }
+    transFormedPlan
+  }
+
+  /**
+   * Create a streaming relation from the input logical relation (source table)
+   *
+   * @param sparkSession spark session
+   * @param logicalRelation source table to convert
+   * @return sourceTable and its streaming relation
+   */
   private def prepareStreamingRelation(
       sparkSession: SparkSession,
-      r: LogicalRelation): (CarbonTable, StreamingRelation) = {
-    val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+      logicalRelation: LogicalRelation): StreamingRelation = {
+    val sourceTable = logicalRelation.relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
     val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
-    val format = tblProperty.get("format")
+    val format = sourceTable.getFormat
     if (format == null) {
       throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
     }
-    val streamReader = sparkSession.readStream
+    val streamReader = sparkSession
+      .readStream
       .schema(getSparkSchema(sourceTable))
       .format(format)
-    val dataFrame = format match {
+    val dataFrame: DataFrame = format match {
       case "csv" | "text" | "json" | "parquet" =>
-        if (!tblProperty.containsKey("path")) {
-          throw new MalformedCarbonCommandException(
-            s"'path' tblproperty should be provided for '$format' format")
-        }
+        shouldHaveProperty(tblProperty, "path", sourceTable)
         streamReader.load(tblProperty.get("path"))
-      case "kafka" | "socket" =>
-        streamReader.load()
       case other =>
         throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")
     }
@@ -116,8 +255,18 @@ case class CarbonCreateStreamCommand(
 
     // Since SparkSQL analyzer will match the UUID in attribute,
     // create a new StreamRelation and re-use the same attribute from LogicalRelation
-    (sourceTable,
-      StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output))
+    StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, logicalRelation.output)
+  }
+
+  private def shouldHaveProperty(
+      tblProperty: java.util.Map[String, String],
+      propertyName: String,
+      sourceTable: CarbonTable) : Unit = {
+    if (!tblProperty.containsKey(propertyName)) {
+      throw new MalformedCarbonCommandException(
+        s"tblproperty '$propertyName' should be provided for stream source " +
+        s"${sourceTable.getDatabaseName}.${sourceTable.getTableName}")
+    }
   }
 
   private def getSparkSchema(sourceTable: CarbonTable): StructType = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 31c9597..baf4664 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1770,28 +1770,18 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         |CREATE STREAM stream123 ON TABLE sink
         |STMPROPERTIES(
         |  'trigger'='ProcessingTime',
-        |  'interval'='1 seconds')
-        |AS
-        |  SELECT *
-        |  FROM source
-        |  WHERE id % 2 = 1
-      """.stripMargin).show(false)
-    sql(
-      """
-        |CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink
-        |STMPROPERTIES(
-        |  'trigger'='ProcessingTime',
-        |  'interval'='1 seconds')
+        |  'interval'='5 seconds')
         |AS
         |  SELECT *
         |  FROM source
         |  WHERE id % 2 = 1
       """.stripMargin).show(false)
+
     Thread.sleep(200)
     sql("select * from sink").show
 
     generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
-    Thread.sleep(5000)
+    Thread.sleep(7000)
 
     // after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
     checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))
@@ -2098,6 +2088,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   }
 
   test("StreamSQL: start stream on non-stream table") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+
     sql(
       s"""
          |CREATE TABLE notsource(
@@ -2143,7 +2136,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           |  WHERE id % 2 = 1
         """.stripMargin).show(false)
     }
-    assert(ex.getMessage.contains("Must specify stream source table in the query"))
+    assert(ex.getMessage.contains("Must specify stream source table in the stream query"))
     sql("DROP TABLE sink")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d55c21a..a6679fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -218,6 +218,12 @@
         <scope>${spark.deps.scope}</scope>
       </dependency>
       <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+        <version>${spark.version}</version>
+        <scope>${spark.deps.scope}</scope>
+      </dependency>
+      <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-compiler</artifactId>
         <version>${scala.version}</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 6f7c398..00d8420 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -78,6 +78,9 @@ public class RowParserImpl implements RowParser {
 
   @Override
   public Object[] parseRow(Object[] row) {
+    if (row == null) {
+      return new String[numberOfColumns];
+    }
     // If number of columns are less in a row then create new array with same size of header.
     if (row.length < numberOfColumns) {
       String[] temp = new String[numberOfColumns];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 89f00c9..744915d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -419,7 +419,7 @@ public class StreamSegment {
    * @return the list of BlockIndex in the index file
    * @throws IOException
    */
-  private static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType)
+  public static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType)
       throws IOException {
     List<BlockIndex> blockIndexList = new ArrayList<>();
     CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);