You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:43 UTC

[16/50] [abbrv] carbondata git commit: [CARBONDATA-3064] Support separate audit log

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
new file mode 100644
index 0000000..d789f5c
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -0,0 +1,2647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.{File, PrintWriter}
+import java.math.BigDecimal
+import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
+import java.util.concurrent.Executors
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
+
+  private val spark = sqlContext.sparkSession
+  private val dataFilePath = s"$resourcesPath/streamSample.csv"
+  def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
+    .getCanonicalPath
+  val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords")
+
+  override def beforeAll {
+    badRecordFilePath.delete()
+    badRecordFilePath.mkdirs()
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    sql("DROP DATABASE IF EXISTS streaming CASCADE")
+    sql("CREATE DATABASE streaming")
+    sql("USE streaming")
+    sql(
+      """
+        | CREATE TABLE source(
+        |    c1 string,
+        |    c2 int,
+        |    c3 string,
+        |    c5 string
+        | ) STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES ('streaming' = 'true')
+      """.stripMargin)
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
+
+    dropTable()
+
+    // 1. normal table not support streaming ingest
+    createTable(tableName = "batch_table", streaming = false, withBatchLoad = true)
+
+    // 2. streaming table with different input source
+    // file source
+    createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true)
+
+    // 3. streaming table with bad records
+    createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true)
+
+    // 4. streaming frequency check
+    createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true)
+
+    // 5. streaming table execute batch loading
+    // 6. detail query
+    // 8. compaction
+    // full scan + filter scan + aggregate query
+    createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
+
+    createTableWithComplexType(
+      tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
+
+
+    // 11. table for delete segment test
+    createTable(tableName = "stream_table_delete_id", streaming = true, withBatchLoad = false)
+    createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad = false)
+
+    // 12. reject alter streaming properties
+    // 13. handoff streaming segment and finish streaming
+    createTable(tableName = "stream_table_handoff", streaming = false, withBatchLoad = false)
+
+    // 15. auto handoff streaming segment
+    // 16. close streaming table
+    // 17. reopen streaming table after close
+    // 9. create new stream segment if current stream segment is full
+    createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false)
+
+    // 18. block drop table while streaming is in progress
+    createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false)
+
+    // 19. block streaming on 'preaggregate' main table
+    createTable(tableName = "agg_table_block", streaming = false, withBatchLoad = false)
+
+    createTable(tableName = "agg_table", streaming = true, withBatchLoad = false)
+
+    createTable(tableName = "stream_table_empty", streaming = true, withBatchLoad = false)
+
+    var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
+  }
+
+  test("validate streaming property") {
+    sql(
+      """
+        | CREATE TABLE correct(
+        |    c1 string
+        | ) STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES ('streaming' = 'true')
+      """.stripMargin)
+    sql("DROP TABLE correct")
+    sql(
+      """
+        | CREATE TABLE correct(
+        |    c1 string
+        | ) STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES ('streaming' = 'false')
+      """.stripMargin)
+    sql("DROP TABLE correct")
+    val exceptionMsg = intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+          | create table wrong(
+          |    c1 string
+          | ) STORED BY 'org.apache.carbondata.format'
+          | TBLPROPERTIES ('streaming' = 'invalid')
+        """.stripMargin)
+    }
+    assert(exceptionMsg.getMessage.equals("Table property \'streaming\' should be either \'true\' or \'false\'"))
+  }
+
+  test("test blocking update and delete operation on streaming table") {
+    val exceptionMsgUpdate = intercept[MalformedCarbonCommandException] {
+      sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").collect()
+    }
+    val exceptionMsgDelete = intercept[MalformedCarbonCommandException] {
+      sql("""DELETE FROM source WHERE d.c1 = 'a'""").collect()
+    }
+    assert(exceptionMsgUpdate.getMessage.equals("Data update is not allowed for streaming table"))
+    assert(exceptionMsgDelete.getMessage.equals("Data delete is not allowed for streaming table"))
+  }
+
+  test("test blocking alter table operation on streaming table") {
+    val addColException = intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").collect()
+    }
+    val dropColException = intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source DROP COLUMNS (c1)""").collect()
+    }
+    val renameException = intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source RENAME to t""").collect()
+    }
+    val changeDataTypeException = intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source CHANGE c2 c2 bigint""").collect()
+    }
+    assertResult("Alter table add column is not allowed for streaming table")(addColException.getMessage)
+    assertResult("Alter table drop column is not allowed for streaming table")(dropColException.getMessage)
+    assertResult("Alter rename table is not allowed for streaming table")(renameException.getMessage)
+    assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
+  }
+
+  override def  afterAll {
+    dropTable()
+    sql("USE default")
+    sql("DROP DATABASE IF EXISTS streaming CASCADE")
+    var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
+    badRecordFilePath.delete()
+    new File(csvDataDir).delete()
+    csvDataDir = integrationPath + "/spark2/target/csvdata"
+    new File(csvDataDir).delete()
+  }
+
+  def dropTable(): Unit = {
+    sql("drop table if exists streaming.batch_table")
+    sql("drop table if exists streaming.stream_table_file")
+    sql("drop table if exists streaming.bad_record_fail")
+    sql("drop table if exists streaming.stream_table_1s")
+    sql("drop table if exists streaming.stream_table_filter ")
+    sql("drop table if exists streaming.stream_table_filter_complex")
+    sql("drop table if exists streaming.stream_table_delete_id")
+    sql("drop table if exists streaming.stream_table_delete_date")
+    sql("drop table if exists streaming.stream_table_handoff")
+    sql("drop table if exists streaming.stream_table_reopen")
+    sql("drop table if exists streaming.stream_table_drop")
+    sql("drop table if exists streaming.agg_table_block")
+    sql("drop table if exists streaming.stream_table_empty")
+  }
+
+  // normal table not support streaming ingest
+  test("normal table not support streaming ingest and alter normal table's streaming property") {
+    // alter normal table's streaming property
+    val msg = intercept[MalformedCarbonCommandException](sql("alter table streaming.batch_table set tblproperties('streaming'='false')"))
+    assertResult("Streaming property value is incorrect")(msg.getMessage)
+
+    val identifier = new TableIdentifier("batch_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    var server: ServerSocket = null
+    try {
+      server = getServerSocket()
+      val thread1 = createWriteSocketThread(server, 2, 10, 1)
+      thread1.start()
+      // use thread pool to catch the exception of sink thread
+      val pool = Executors.newSingleThreadExecutor()
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier)
+      val future = pool.submit(thread2)
+      Thread.sleep(1000)
+      thread1.interrupt()
+      val msg = intercept[Exception] {
+        future.get()
+      }
+      assert(msg.getMessage.contains("is not a streaming table"))
+    } finally {
+      if (server != null) {
+        server.close()
+      }
+    }
+  }
+
+  // input source: file
+  test("streaming ingest from file source") {
+    val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdata").getCanonicalPath
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(2000)
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_file"),
+      Seq(Row(25))
+    )
+
+    val row = sql("select * from streaming.stream_table_file order by id").head()
+    val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
+    assertResult(exceptedRow)(row)
+  }
+
+  test("test preaggregate table creation on streaming table without handoff") {
+    val identifier = new TableIdentifier("agg_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table"),
+      Seq(Row(10)))
+    sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name")
+    // No data should be loaded into aggregate table as hand-off is not yet fired
+    checkAnswer(sql("select * from agg_table_p1"), Seq())
+  }
+
+  test("test if data is loaded into preaggregate after handoff is fired") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+    sql("create datamap p2 on table agg_table2 using 'preaggregate' as select name, avg(salary) from agg_table2 group by name")
+    sql("create datamap p3 on table agg_table2 using 'preaggregate' as select name, min(salary) from agg_table2 group by name")
+    sql("create datamap p4 on table agg_table2 using 'preaggregate' as select name, max(salary) from agg_table2 group by name")
+    sql("create datamap p5 on table agg_table2 using 'preaggregate' as select name, count(salary) from agg_table2 group by name")
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+    checkAnswer(sql("select * from agg_table2_p2"),
+      Seq(
+        Row("name_10", 200000.0, 2.0),
+        Row("name_11", 220000.0, 2.0),
+        Row("name_12", 240000.0, 2.0),
+        Row("name_13", 260000.0, 2.0),
+        Row("name_14", 280000.0, 2.0)))
+    checkAnswer(sql("select * from agg_table2_p3"),
+      Seq(
+        Row("name_10", 100000.0),
+        Row("name_11", 110000.0),
+        Row("name_12", 120000.0),
+        Row("name_13", 130000.0),
+        Row("name_14", 140000.0)))
+    checkAnswer(sql("select * from agg_table2_p4"),
+      Seq(
+        Row("name_10", 100000.0),
+        Row("name_11", 110000.0),
+        Row("name_12", 120000.0),
+        Row("name_13", 130000.0),
+        Row("name_14", 140000.0)))
+    checkAnswer(sql("select * from agg_table2_p5"),
+      Seq(
+        Row("name_10", 2.0),
+        Row("name_11", 2.0),
+        Row("name_12", 2.0),
+        Row("name_13", 2.0),
+        Row("name_14", 2.0)))
+    sql("drop table agg_table2")
+  }
+
+  test("test whether data is loaded into preaggregate after handoff is fired") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')")
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"),
+      Seq(
+        Row("name_10", 400000.0),
+        Row("name_14", 560000.0),
+        Row("name_12", 480000.0),
+        Row("name_11", 440000.0),
+        Row("name_13", 520000.0)))
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0),
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+
+    sql("drop table agg_table2")
+  }
+
+  test("test whether data is loaded into preaggregate before handoff is fired") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')")
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"),
+      Seq(
+        Row("name_10", 400000.0),
+        Row("name_14", 560000.0),
+        Row("name_12", 480000.0),
+        Row("name_11", 440000.0),
+        Row("name_13", 520000.0)))
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+
+    sql("drop table agg_table2")
+  }
+
+  test("test if timeseries load is successful when created on streaming table") {
+    sql("drop table if exists timeseries_table")
+    createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("timeseries_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE timeseries_table
+         | USING '${TIMESERIES.toString}'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='register',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT register, SUM(id) FROM timeseries_table
+         | GROUP BY register
+       """.stripMargin)
+    sql("alter table timeseries_table finish streaming")
+    sql("alter table timeseries_table compact 'streaming'")
+    checkAnswer( sql("select * FROM timeseries_table_agg0_second"), Seq(Row(Timestamp.valueOf("2010-01-01 10:01:01.0"), 120)))
+  }
+
+  test("test if timeseries load is successful when created on streaming table with day granularity") {
+    sql("drop table if exists timeseries_table")
+    createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("timeseries_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE timeseries_table
+         | USING '${TIMESERIES.toString}'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='register',
+         | 'DAY_GRANULARITY'='1')
+         | AS SELECT register, SUM(id) FROM timeseries_table
+         | GROUP BY register
+       """.stripMargin)
+    sql("alter table timeseries_table finish streaming")
+    sql("alter table timeseries_table compact 'streaming'")
+    checkAnswer( sql("select * FROM timeseries_table_agg0_day"), Seq(Row(Timestamp.valueOf("2010-01-01 00:00:00.0"), 120)))
+  }
+
+  test("test if minor compaction is successful for streaming and preaggregate tables") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+    sql("create datamap p2 on table agg_table2 using 'preaggregate' as select name, min(salary) from agg_table2 group by name")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    sql("alter table agg_table2 compact 'minor'")
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 800000.0),
+        Row("name_11", 880000.0),
+        Row("name_12", 960000.0),
+        Row("name_13", 1040000.0),
+        Row("name_14", 1120000.0)))
+    assert(sql("show segments for table agg_table2").collect().map(_.get(0)).contains("1.1"))
+    assert(sql("show segments for table agg_table2_p1").collect().map(_.get(0)).contains("0.1"))
+    assert(sql("show segments for table agg_table2_p2").collect().map(_.get(0)).contains("0.1"))
+    sql("drop table if exists agg_table2")
+  }
+
+  test("test if major compaction is successful for streaming and preaggregate tables") {
+    sql("drop table if exists agg_table2")
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    sql("alter table agg_table2 compact 'major'")
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 800000.0),
+        Row("name_11", 880000.0),
+        Row("name_12", 960000.0),
+        Row("name_13", 1040000.0),
+        Row("name_14", 1120000.0)))
+    assert(sql("show segments for table agg_table2").collect().map(_.get(0)).contains("1.1"))
+    assert(sql("show segments for table agg_table2_p1").collect().map(_.get(0)).contains("0.1"))
+    sql("drop table if exists agg_table2")
+  }
+
+  def loadData() {
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(2000)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    Thread.sleep(5000)
+    thread.interrupt()
+  }
+
+  test("test if data is displayed when alias is used for column name") {
+    sql("drop table if exists agg_table2")
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdata1").getCanonicalPath
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')")
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select name as abc, sum(salary) as sal from agg_table2 group by name"),
+      Seq(
+        Row("name_14", 560000.0),
+        Row("name_10", 400000.0),
+        Row("name_12", 480000.0),
+        Row("name_11", 440000.0),
+        Row("name_13", 520000.0)))
+
+    sql("drop table agg_table2")
+  }
+
+  test("test if data is loaded in aggregate table after handoff is done for streaming table") {
+    createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table3", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table3"),
+      Seq(Row(10)))
+    sql("alter table agg_table3 finish streaming")
+    sql("alter table agg_table3 compact 'streaming'")
+    sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select * from agg_table3_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+  }
+
+  // bad records
+  test("streaming table with bad records action: fail") {
+    executeStreamingIngest(
+      tableName = "bad_record_fail",
+      batchNums = 2,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 8,
+      generateBadRecords = true,
+      badRecordAction = "fail",
+      autoHandoff = false
+    )
+    val result = sql("select count(*) from streaming.bad_record_fail").collect()
+    assert(result(0).getLong(0) < 10 + 5)
+  }
+
+  // ingest with different interval
+  test("1 row per 1 second interval") {
+    executeStreamingIngest(
+      tableName = "stream_table_1s",
+      batchNums = 3,
+      rowNumsEachBatch = 1,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 6,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+    val result = sql("select count(*) from streaming.stream_table_1s").collect()
+    // 20 seconds can't ingest all data, exists data delay
+    assert(result(0).getLong(0) >= 5)
+  }
+
+  test("query on stream table with dictionary, sort_columns") {
+    val batchParts =
+      partitionNums("select * from streaming.stream_table_filter")
+
+    executeStreamingIngest(
+      tableName = "stream_table_filter",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+
+    val totalParts =
+      partitionNums("select * from streaming.stream_table_filter")
+    assert(totalParts > batchParts)
+
+    val streamParts = totalParts - batchParts
+
+    // non-filter
+    val result = sql("select * from streaming.stream_table_filter order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(1).isNullAt(0))
+    assert(result(1).getString(1) == "name_6")
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+
+    // filter
+    assert(batchParts >= partitionNums("select * from stream_table_filter where id >= 100000001"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id = 1"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id > 49 and id < 100000002"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id between 50 and 100000001"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = 'name_3'"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name = 'name_3'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name like '%me_3%' and id < 30"))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where name like '%ame%'"),
+      Seq(Row(49)))
+    assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%ame%'"))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where name like '%batch%'"),
+      Seq(Row(5)))
+    assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%batch%'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name >= 'name_3' and id < 4"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where city = 'city_1'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where city like '%city%'"),
+      Seq(Row(54)))
+    assert(totalParts == partitionNums("select count(*) from stream_table_filter where city like '%city%'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary = 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where salary = 90000"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where salary > 80000 and salary <= 100000"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary between 80001 and 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where salary between 80001 and 90000"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax = 0.04 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax >= 0.04 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent = 80.04 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent >= 80.04 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday = '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday = '1990-01-04'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where register = '2010-01-04 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null order by name"),
+      Seq(Row(null, "", "", null, null, null, null, null, null),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts >= partitionNums("select * from stream_table_filter where id is null order by name"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where name = ''"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and name <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and name <> ''"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(streamParts >= partitionNums("select * from stream_table_filter where city = ''"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and city <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and city <> ''"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where salary is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(totalParts == partitionNums("select * from stream_table_filter where salary is null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and salary is not null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where tax is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(totalParts == partitionNums("select * from stream_table_filter where tax is null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and tax is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and tax is not null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where percent is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(totalParts == partitionNums("select * from stream_table_filter where percent is null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and percent is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and percent is not null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where birthday is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(1 == partitionNums("select * from stream_table_filter where birthday is null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and birthday is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and birthday is not null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where register is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(1 == partitionNums("select * from stream_table_filter where register is null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and register is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and register is not null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where updated is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null)))
+    assert(3 == partitionNums("select * from stream_table_filter where updated is null"))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null and updated is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+    assert(totalParts == partitionNums("select * from stream_table_filter where id is null and updated is not null"))
+
+    // agg
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+          "from stream_table_filter where id >= 2 and id <= 100000004"),
+      Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
+    assert(totalParts >= partitionNums(
+      "select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+      "from stream_table_filter where id >= 2 and id <= 100000004"))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_filter " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+    assert(totalParts >= partitionNums(
+      "select city, count(id), sum(id), cast(avg(id) as integer), " +
+      "max(salary), min(salary) " +
+      "from stream_table_filter " +
+      "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+      "and city <> '' " +
+      "group by city " +
+      "order by city"))
+
+    // batch loading
+    for(_ <- 0 to 2) {
+      executeBatchLoad("stream_table_filter")
+    }
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_filter"),
+      Seq(Row(25 * 2 + 5 + 5 * 3)))
+
+    sql("alter table streaming.stream_table_filter compact 'minor'")
+    Thread.sleep(5000)
+    val result1 = sql("show segments for table streaming.stream_table_filter").collect()
+    result1.foreach { row =>
+      if (row.getString(0).equals("1")) {
+        assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+      } else if (row.getString(0).equals("0.1")) {
+        assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
+        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+      } else {
+        assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
+        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+      }
+    }
+
+  }
+
+  test("query on stream table with dictionary, sort_columns and complex column") {
+    executeStreamingIngest(
+      tableName = "stream_table_filter_complex",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+
+    // non-filter
+    val result = sql("select * from streaming.stream_table_filter_complex order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(0).isNullAt(0))
+    assert(result(0).getString(1) == "")
+    assert(result(0).getStruct(9).isNullAt(1))
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+    assert(result(50).getStruct(9).getInt(1) == 20)
+
+    // filter
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = 'name_3'"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%ame%'"),
+      Seq(Row(49)))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%batch%'"),
+      Seq(Row(5)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where city like '%city%'"),
+      Seq(Row(54)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary = 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary between 80001 and 90000"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+        Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+        Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null order by name"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
+        Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and name <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = ''"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and city <> ''"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where salary is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where tax is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and tax is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where percent is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where birthday is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where register is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and register is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where updated is null"),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null and updated is not null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+    // agg
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
+          "from stream_table_filter_complex where id >= 2 and id <= 100000004"),
+      Seq(Row(51, 100000004, "batch_1", 27, 1406)))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_filter_complex " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 30, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
+  }
+
+  test("test deleting streaming segment by ID while ingesting") {
+    executeStreamingIngest(
+      tableName = "stream_table_delete_id",
+      batchNums = 3,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 18,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1,
+      autoHandoff = false
+    )
+    val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
+    val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",")
+    val msg = intercept[Exception] {
+      sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds1) ")
+    }
+    assertResult(s"Delete segment by Id is failed. Invalid ID is: ${beforeDelete.length -1}")(msg.getMessage)
+
+    val segmentIds2 = beforeDelete.filter(_.getString(1).equals("Streaming Finish"))
+      .map(_.getString(0)).mkString(",")
+    sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds2) ")
+    val afterDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
+    afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
+      assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
+    }
+  }
+
+  test("test deleting streaming segment by date while ingesting") {
+    executeStreamingIngest(
+      tableName = "stream_table_delete_date",
+      batchNums = 3,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 18,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1,
+      autoHandoff = false)
+    val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
+    sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before " +
+        s"'2999-10-01 01:00:00'")
+    val segmentIds = beforeDelete.filter(_.getString(1).equals("Streaming"))
+    assertResult(1)(segmentIds.length)
+    val afterDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
+    afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
+      assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
+    }
+  }
+
+  test("reject alter streaming properties and handoff 'streaming finish' segment to columnar segment") {
+    try {
+      sql("ALTER TABLE streaming.stream_table_handoff UNSET TBLPROPERTIES IF EXISTS ('streaming')")
+      assert(false, "unsupport to unset streaming property")
+    } catch {
+      case _: Throwable =>
+        assert(true)
+    }
+    try {
+      sql("ALTER TABLE streaming.stream_table_handoff SET TBLPROPERTIES('streaming'='true')")
+      executeStreamingIngest(
+        tableName = "stream_table_handoff",
+        batchNums = 2,
+        

<TRUNCATED>