You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/02/28 01:38:25 UTC

carbondata git commit: [CARBONDATA-2147][CARBONDATA-2148][Streaming] Add new row parser: RowStreamParserImpl

Repository: carbondata
Updated Branches:
  refs/heads/master d0858b73e -> 648c3d349


[CARBONDATA-2147][CARBONDATA-2148][Streaming] Add new row parser: RowStreamParserImpl

Currently the default value of 'carbon.stream.parser' is CSVStreamParserImp, it transforms InternalRow(0) to Array[Object], InternalRow(0) represents the value of one line which is received from Socket. When it receives data from Kafka, the schema of InternalRow is changed, either it need to assemble the fields of kafka data Row into a String and stored it as InternalRow(0), or define a new parser to convert kafka data Row to Array[Object]. It needs the same operation for every table.

Solution:
Use a new parser called RowStreamParserImpl, this new parser will automatically convert InternalRow to Array[Object] according to the schema. In general, we will transform source data to a structed Row object, using this way, we do not need to define a parser for every table.

This closes #1959


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/648c3d34
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/648c3d34
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/648c3d34

Branch: refs/heads/master
Commit: 648c3d3493d3bdf278c09021ac4596731d165715
Parents: d0858b7
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Feb 9 14:49:58 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Feb 28 09:30:28 2018 +0800

----------------------------------------------------------------------
 .../CarbonStructuredStreamingExample.scala      |  20 +-
 ...CarbonStructuredStreamingWithRowParser.scala | 217 +++++
 .../TestStreamingTableOperation.scala           |  23 +-
 .../TestStreamingTableWithRowParser.scala       | 944 +++++++++++++++++++
 .../streaming/parser/CSVStreamParserImp.java    |   3 +-
 .../streaming/parser/CarbonStreamParser.java    |   6 +-
 .../streaming/parser/RowStreamParserImp.scala   |  72 ++
 .../streaming/CarbonAppendableStreamSink.scala  |  25 +-
 8 files changed, 1272 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
index 247a59b..8ce4afc 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
@@ -23,8 +23,6 @@ import java.net.ServerSocket
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 
 // scalastyle:off println
@@ -34,23 +32,9 @@ object CarbonStructuredStreamingExample {
     // setup paths
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target"
-    val streamTableName = s"stream_table"
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
 
-    import org.apache.spark.sql.CarbonSession._
-    val spark = SparkSession
-      .builder()
-      .master("local")
-      .appName("CarbonStructuredStreamingExample")
-      .config("spark.sql.warehouse.dir", warehouse)
-      .getOrCreateCarbonSession(storeLocation, metastoredb)
-
-    spark.sparkContext.setLogLevel("ERROR")
+    val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingExample", 4)
+    val streamTableName = s"stream_table"
 
     val requireCreateTable = true
     val useComplexDataType = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
new file mode 100644
index 0000000..f134a8d
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class FileElement(school: Array[String], age: Int)
+case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
+
+// scalastyle:off println
+object CarbonStructuredStreamingWithRowParser {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingWithRowParser", 4)
+    val streamTableName = s"stream_table_with_row_parser"
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, tablePath)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          import spark.implicits._
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+            .as[String]
+            .map(_.split(","))
+            .map { fields => {
+              val tmp = fields(4).split("\\$")
+              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+              if (fields(0).toInt % 2 == 0) {
+                StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file)
+              } else {
+                StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
+              }
+            } }
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("dbName", "default")
+            .option("tableName", "stream_table_with_row_parser")
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              "org.apache.carbondata.streaming.parser.RowStreamParserImp")
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case ex: Exception =>
+            ex.printStackTrace()
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 5644302..4ae3737 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1149,7 +1149,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         val clientSocket = serverSocket.accept()
         val socketWriter = new PrintWriter(clientSocket.getOutputStream())
         var index = 0
-        var timeRow = true
         for (_ <- 1 to writeNums) {
           // write 5 records per iteration
           val stringBuilder = new StringBuilder()
@@ -1165,20 +1164,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
                                      + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
                                      ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
                                      ",school_" + index + ":school_" + index + index + "$" + index)
+              } else if (index == 9) {
+                stringBuilder.append(index.toString + ",name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString + ",0.04,80.04" +
+                                     ",1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01" +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
               } else {
-
-                if (index == 9 && timeRow) {
-                  timeRow = false
-                  stringBuilder.append(index.toString + ",name_" + index
-                                       + ",city_" + index + "," + (10000.00 * index).toString + ",0.04,80.04" +
-                                       ",1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01" +
-                                       ",school_" + index + ":school_" + index + index + "$" + index)
-                } else {
-                  stringBuilder.append(index.toString + ",name_" + index
-                                       + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
-                                       ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
-                                       ",school_" + index + ":school_" + index + index + "$" + index)
-                }
+                stringBuilder.append(index.toString + ",name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                     ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
               }
             } else {
               stringBuilder.append(index.toString + ",name_" + index

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
new file mode 100644
index 0000000..a3df2be
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -0,0 +1,944 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.{File, PrintWriter}
+import java.math.BigDecimal
+import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class FileElement(school: Array[String], age: Integer)
+case class StreamData(id: Integer, name: String, city: String, salary: java.lang.Float,
+    tax: BigDecimal, percent: java.lang.Double, birthday: String,
+    register: String, updated: String,
+    file: FileElement)
+
+class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
+
+  private val spark = sqlContext.sparkSession
+  private val dataFilePath = s"$resourcesPath/streamSample.csv"
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    sql("DROP DATABASE IF EXISTS streaming1 CASCADE")
+    sql("CREATE DATABASE streaming1")
+    sql("USE streaming1")
+
+    dropTable()
+
+    createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
+
+    createTableWithComplexType(
+      tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
+  }
+
+  override def afterAll {
+    dropTable()
+    sql("USE default")
+    sql("DROP DATABASE IF EXISTS streaming1 CASCADE")
+  }
+
+  def dropTable(): Unit = {
+    sql("drop table if exists streaming1.stream_table_filter")
+    sql("drop table if exists streaming1.stream_table_filter_complex")
+  }
+
+  test("query on stream table with dictionary, sort_columns") {
+    executeStreamingIngest(
+      tableName = "stream_table_filter",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+
+    // non-filter
+    val result = sql("select * from streaming1.stream_table_filter order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(1).isNullAt(0))
+    assert(result(1).getString(1) == "name_6")
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+
+    // filter
+    checkAnswer(
+      sql("select * from stream_table_filter where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = 'name_3'"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where name like '%ame%'"),
+      Seq(Row(49)))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where name like '%batch%'"),
+      Seq(Row(5)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where city like '%city%'"),
+      Seq(Row(54)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary = 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary between 80001 and 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday = '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null order by name"),
+      Seq(Row(null, "", "", null, null, null, null, null, null),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and name <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and city <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and tax is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and percent is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and birthday is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and register is not null"),
+      Seq())
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and updated is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    // agg
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+          "from stream_table_filter where id >= 2 and id <= 100000004"),
+      Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_filter " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+
+    // batch loading
+    for(_ <- 0 to 2) {
+      executeBatchLoad("stream_table_filter")
+    }
+    checkAnswer(
+      sql("select count(*) from streaming1.stream_table_filter"),
+      Seq(Row(25 * 2 + 5 + 5 * 3)))
+
+    sql("alter table streaming1.stream_table_filter compact 'minor'")
+    Thread.sleep(5000)
+    val result1 = sql("show segments for table streaming1.stream_table_filter").collect()
+    result1.foreach { row =>
+      if (row.getString(0).equals("1")) {
+        assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+      } else if (row.getString(0).equals("0.1")) {
+        assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
+        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+      } else {
+        assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
+        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+      }
+    }
+
+  }
+
+  test("query on stream table with dictionary, sort_columns and complex column") {
+    executeStreamingIngest(
+      tableName = "stream_table_filter_complex",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+
+    // non-filter
+    val result = sql("select * from streaming1.stream_table_filter_complex order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(0).isNullAt(0))
+    assert(result(0).getString(1) == "")
+    assert(result(0).getStruct(9).isNullAt(1))
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+    assert(result(50).getStruct(9).getInt(1) == 20)
+
+    // filter
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = 'name_3'"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%ame%'"),
+      Seq(Row(49)))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%batch%'"),
+      Seq(Row(5)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where city like '%city%'"),
+      Seq(Row(54)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary = 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary between 80001 and 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null order by name"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null)),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and name <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and city <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and tax is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null)),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and register is not null"),
+      Seq())
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and updated is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    // agg
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
+          "from stream_table_filter_complex where id >= 2 and id <= 100000004"),
+      Seq(Row(51, 100000004, "batch_1", 27, 1406)))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_filter_complex " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 30, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
+  }
+
+  def createWriteSocketThread(
+      serverSocket: ServerSocket,
+      writeNums: Int,
+      rowNums: Int,
+      intervalSecond: Int,
+      badRecords: Boolean = false): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to writeNums) {
+          // write 5 records per iteration
+          val stringBuilder = new StringBuilder()
+          for (_ <- 1 to rowNums) {
+            index = index + 1
+            if (badRecords) {
+              if (index == 2) {
+                // null value
+                stringBuilder.append(",,,,,,,,,")
+              } else if (index == 6) {
+                // illegal number
+                stringBuilder.append(index.toString + "abc,name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                     ",1990-01-01,2010-01-0110:01:01,2010-01-01 10:01:01" +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              } else if (index == 9) {
+                stringBuilder.append(index.toString + ",name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString + ",0.04,80.04" +
+                                     ",1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01" +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              } else {
+                stringBuilder.append(index.toString + ",name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                     ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              }
+            } else {
+              stringBuilder.append(index.toString + ",name_" + index
+                                   + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                   ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
+                                   ",school_" + index + ":school_" + index + index + "$" + index)
+            }
+            stringBuilder.append("\n")
+          }
+          socketWriter.append(stringBuilder.toString())
+          socketWriter.flush()
+          Thread.sleep(1000 * intervalSecond)
+        }
+        socketWriter.close()
+      }
+    }
+  }
+
+  def createSocketStreamingThread(
+      spark: SparkSession,
+      port: Int,
+      tablePath: CarbonTablePath,
+      tableIdentifier: TableIdentifier,
+      badRecordAction: String = "force",
+      intervalSecond: Int = 2,
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          import spark.implicits._
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", port)
+            .load()
+            .as[String]
+            .map(_.split(","))
+            .map { fields => {
+              if (fields.length == 0) {
+                StreamData(null, "", "", null, null, null, null, null, null, null)
+              } else {
+                val tmp = fields(9).split("\\$")
+                val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+                if (fields(1).equals("name_6")) {
+                  StreamData(null, fields(1), fields(2), fields(3).toFloat,
+                      BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble,
+                      fields(6), fields(7), fields(8), file)
+                } else {
+                  StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat,
+                      BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble,
+                      fields(6), fields(7), fields(8), file)
+                }
+              }
+            } }
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime(s"$intervalSecond seconds"))
+            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("bad_records_action", badRecordAction)
+            .option("dbName", tableIdentifier.database.get)
+            .option("tableName", tableIdentifier.table)
+            .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+            .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+            .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              "org.apache.carbondata.streaming.parser.RowStreamParserImp")
+            .start()
+          qry.awaitTermination()
+        } catch {
+          case ex =>
+            throw new Exception(ex.getMessage)
+        } finally {
+          if (null != qry) {
+            qry.stop()
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
+   */
+  def executeStreamingIngest(
+      tableName: String,
+      batchNums: Int,
+      rowNumsEachBatch: Int,
+      intervalOfSource: Int,
+      intervalOfIngest: Int,
+      continueSeconds: Int,
+      generateBadRecords: Boolean,
+      badRecordAction: String,
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Unit = {
+    val identifier = new TableIdentifier(tableName, Option("streaming1"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    var server: ServerSocket = null
+    try {
+      server = getServerSocket()
+      val thread1 = createWriteSocketThread(
+        serverSocket = server,
+        writeNums = batchNums,
+        rowNums = rowNumsEachBatch,
+        intervalSecond = intervalOfSource,
+        badRecords = generateBadRecords)
+      val thread2 = createSocketStreamingThread(
+        spark = spark,
+        port = server.getLocalPort,
+        tablePath = tablePath,
+        tableIdentifier = identifier,
+        badRecordAction = badRecordAction,
+        intervalSecond = intervalOfIngest,
+        handoffSize = handoffSize,
+        autoHandoff = autoHandoff)
+      thread1.start()
+      thread2.start()
+      Thread.sleep(continueSeconds * 1000)
+      thread2.interrupt()
+      thread1.interrupt()
+    } finally {
+      if (null != server) {
+        server.close()
+      }
+    }
+  }
+
+  def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming1.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 'dictionary_include'='city,register')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def createTableWithComplexType(
+      tableName: String,
+      streaming: Boolean,
+      withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming1.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
+         | file struct<school:array<string>, age:int>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def executeBatchLoad(tableName: String): Unit = {
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$dataFilePath'
+         | INTO TABLE streaming1.$tableName
+         | OPTIONS('HEADER'='true')
+         """.stripMargin)
+  }
+
+  def wrap(array: Array[String]) = {
+    new mutable.WrappedArray.ofRef(array)
+  }
+
+  /**
+   * get a ServerSocket
+   * if the address was already used, it will retry to use new port number.
+   *
+   * @return ServerSocket
+   */
+  def getServerSocket(): ServerSocket = {
+    var port = 7071
+    var serverSocket: ServerSocket = null
+    var retry = false
+    do {
+      try {
+        retry = false
+        serverSocket = new ServerSocket(port)
+      } catch {
+        case ex: BindException =>
+          retry = true
+          port = port + 2
+          if (port >= 65535) {
+            throw ex
+          }
+      }
+    } while (retry)
+    serverSocket
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
index eed3fd5..00d06b6 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
@@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * CSV Stream Parser, it is also the default parser.
@@ -31,7 +32,7 @@ public class CSVStreamParserImp implements CarbonStreamParser {
 
   private CsvParser csvParser;
 
-  @Override public void initialize(Configuration configuration) {
+  @Override public void initialize(Configuration configuration, StructType structType) {
     CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(configuration);
     csvParser = new CsvParser(settings);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index a3b5592..643758c 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.streaming.parser;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * Stream parser interface
@@ -27,9 +28,10 @@ public interface CarbonStreamParser {
 
   String CARBON_STREAM_PARSER = "carbon.stream.parser";
 
-  String CARBON_STREAM_PARSER_DEFAULT = "org.apache.carbondata.streaming.parser.CSVStreamParserImp";
+  String CARBON_STREAM_PARSER_DEFAULT =
+      "org.apache.carbondata.streaming.parser.CSVStreamParserImp";
 
-  void initialize(Configuration configuration);
+  void initialize(Configuration configuration, StructType structType);
 
   Object[] parserRow(InternalRow value);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
new file mode 100644
index 0000000..5a227cf
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.parser
+
+import java.text.SimpleDateFormat
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * SparkSQL Row Stream Parser.
+ */
+class RowStreamParserImp extends CarbonStreamParser {
+
+  var configuration: Configuration = null
+  var structType: StructType = null
+  var encoder: ExpressionEncoder[Row] = null
+
+  var timeStampFormat: SimpleDateFormat = null
+  var dateFormat: SimpleDateFormat = null
+  var complexDelimiterLevel1: String = null
+  var complexDelimiterLevel2: String = null
+  var serializationNullFormat: String = null
+
+  override def initialize(configuration: Configuration, structType: StructType): Unit = {
+    this.configuration = configuration
+    this.structType = structType
+    this.encoder = RowEncoder.apply(this.structType).resolveAndBind()
+
+    this.timeStampFormat = new SimpleDateFormat(
+      this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
+    this.dateFormat = new SimpleDateFormat(
+      this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT))
+    this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1")
+    this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2")
+    this.serializationNullFormat =
+      this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+  }
+
+  override def parserRow(value: InternalRow): Array[Object] = {
+    this.encoder.fromRow(value).toSeq.map { x => {
+      CarbonScalaUtil.getString(x,
+        serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2,
+        timeStampFormat, dateFormat)
+    } }.toArray
+  }
+
+  override def close(): Unit = {
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/648c3d34/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 67d8a4d..f2f9853 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 import org.apache.carbondata.common.CarbonIterator
@@ -44,6 +45,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.streaming.{CarbonStreamException, StreamHandoffRDD}
@@ -73,6 +75,20 @@ class CarbonAppendableStreamSink(
     parameters.foreach { entry =>
       conf.set(entry._1, entry._2)
     }
+    // properties below will be used for default CarbonStreamParser
+    conf.set("carbon_complex_delimiter_level_1",
+      carbonLoadModel.getComplexDelimiterLevel1)
+    conf.set("carbon_complex_delimiter_level_2",
+      carbonLoadModel.getComplexDelimiterLevel2)
+    conf.set(
+      DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+      carbonLoadModel.getSerializationNullFormat().split(",")(1))
+    conf.set(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      carbonLoadModel.getTimestampformat())
+    conf.set(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      carbonLoadModel.getDateFormat())
     conf
   }
   // segment max size(byte)
@@ -223,6 +239,7 @@ object CarbonAppendableStreamSink {
           server.get.initializeDictionaryGenerator(carbonTable)
         }
 
+        val rowSchema = queryExecution.analyzed.schema
         // write data file
         result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
           (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
@@ -233,7 +250,8 @@ object CarbonAppendableStreamSink {
               sparkPartitionId = taskContext.partitionId(),
               sparkAttemptNumber = taskContext.attemptNumber(),
               committer,
-              iterator
+              iterator,
+              rowSchema
             )
           })
 
@@ -280,7 +298,8 @@ object CarbonAppendableStreamSink {
       sparkPartitionId: Int,
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
-      iterator: Iterator[InternalRow]
+      iterator: Iterator[InternalRow],
+      rowSchema: StructType
   ): TaskCommitMessage = {
 
     val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
@@ -311,7 +330,7 @@ object CarbonAppendableStreamSink {
 
         val streamParser =
           Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
-        streamParser.initialize(taskAttemptContext.getConfiguration)
+        streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
 
         StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
           taskAttemptContext, carbonLoadModel)