You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:18:59 UTC
[02/16] carbondata git commit: [CARBONDATA-2151][Streaming] Fix
filter query issue on streaming table
[CARBONDATA-2151][Streaming] Fix filter query issue on streaming table
1.Fix filter query issue for timestamp, date, decimal
2.Add more test case
dataType: int, streaming, float, double, decimal, timestamp, date, complex
operation: =, <>, >=, >, <, <=, in, like, between, is null, is not null
This closes #1969
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8a62a9b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8a62a9b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8a62a9b
Branch: refs/heads/branch-1.3
Commit: f8a62a9bd8ba39cd6bc247c587a7a3e1afd99254
Parents: c2785b3
Author: QiangCai <qi...@qq.com>
Authored: Sun Feb 11 16:06:01 2018 +0800
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Feb 27 12:46:13 2018 +0530
----------------------------------------------------------------------
.../carbondata/core/scan/filter/FilterUtil.java | 4 +
.../executer/ExcludeFilterExecuterImpl.java | 2 +-
.../scan/filter/executer/FilterExecuter.java | 4 +
.../executer/RangeValueFilterExecuterImpl.java | 29 +
.../executer/RowLevelFilterExecuterImpl.java | 3 +-
.../RowLevelRangeGrtThanFiterExecuterImpl.java | 18 +
...elRangeGrtrThanEquaToFilterExecuterImpl.java | 18 +
...velRangeLessThanEqualFilterExecuterImpl.java | 18 +
.../RowLevelRangeLessThanFiterExecuterImpl.java | 18 +
.../streaming/CarbonStreamRecordReader.java | 2 +-
.../src/test/resources/streamSample.csv | 12 +-
.../TestStreamingTableOperation.scala | 547 +++++++++++++++++--
12 files changed, 612 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 689da9f..8dcac30 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1790,6 +1790,10 @@ public final class FilterUtil {
}
public static void updateIndexOfColumnExpression(Expression exp, int dimOridnalMax) {
+ // if expression is null, not require to update index.
+ if (exp == null) {
+ return;
+ }
if (exp.getChildren() == null || exp.getChildren().size() == 0) {
if (exp instanceof ColumnExpression) {
ColumnExpression ce = (ColumnExpression) exp;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 465bee6..fad37fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -135,7 +135,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
@Override public boolean applyFilter(RowIntf value, int dimOrdinalMax) {
if (isDimensionPresentInCurrentBlock) {
- byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+ byte[][] filterValues = dimColumnExecuterInfo.getExcludeFilterKeys();
byte[] col = (byte[])value.getVal(dimColEvaluatorInfo.getDimension().getOrdinal());
for (int i = 0; i < filterValues.length; i++) {
if (0 == ByteUtil.UnsafeComparer.INSTANCE.compareTo(col, 0, col.length,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
index 85891dc..53d3068 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterExecuter.java
@@ -35,6 +35,10 @@ public interface FilterExecuter {
BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder, boolean useBitsetPipeLine)
throws FilterUnsupportedException, IOException;
+ /**
+ * apply range filter on a row
+ * @return true: if the value satisfy the filter; or else false.
+ */
boolean applyFilter(RowIntf value, int dimOrdinalMax)
throws FilterUnsupportedException, IOException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index ee373c5..797fe9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExp
import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -146,6 +147,34 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl {
}
/**
+ * apply range filter on a row
+ */
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ throws FilterUnsupportedException, IOException {
+
+ byte[] col = (byte[]) value.getVal(dimColEvaluatorInfo.getDimension().getOrdinal());
+ byte[][] filterValues = this.filterRangesValues;
+
+ if (isDimensionPresentInCurrentBlock) {
+ boolean result;
+ if (greaterThanExp) {
+ result = ByteUtil.compare(filterValues[0], col) < 0;
+ } else {
+ result = ByteUtil.compare(filterValues[0], col) <= 0;
+ }
+
+ if (result) {
+ if (lessThanExp) {
+ return ByteUtil.compare(filterValues[1], col) > 0;
+ } else {
+ return ByteUtil.compare(filterValues[1], col) >= 0;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
* Method to find presence of LessThan Expression.
* @return
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 89489a2..8956f30 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -272,7 +272,8 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
return bitSetGroup;
}
- @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ @Override
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
throws FilterUnsupportedException, IOException {
try {
return exp.evaluate(value).getBoolean();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 306f3fa..3981211 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -248,6 +249,23 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
return null;
}
+ @Override
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ throws FilterUnsupportedException, IOException {
+ if (isDimensionPresentInCurrentBlock[0]) {
+ byte[] col =
+ (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+ return ByteUtil.compare(filterRangeValues[0], col) < 0;
+ }
+
+ if (isMeasurePresentInCurrentBlock[0]) {
+ Object col =
+ value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+ return comparator.compare(msrFilterRangeValues[0], col) < 0;
+ }
+ return false;
+ }
+
private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
int numerOfRows) {
BitSet bitSet = new BitSet(numerOfRows);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index db55e42..f2ddcb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -248,6 +249,23 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
return null;
}
+ @Override
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ throws FilterUnsupportedException, IOException {
+ if (isDimensionPresentInCurrentBlock[0]) {
+ byte[] col =
+ (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+ return ByteUtil.compare(filterRangeValues[0], col) <= 0;
+ }
+
+ if (isMeasurePresentInCurrentBlock[0]) {
+ Object col =
+ value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+ return comparator.compare(msrFilterRangeValues[0], col) <= 0;
+ }
+ return false;
+ }
+
private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
int numerOfRows) {
BitSet bitSet = new BitSet(numerOfRows);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 88cf75c..a44bc1a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -230,6 +231,23 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
return null;
}
+ @Override
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ throws FilterUnsupportedException, IOException {
+ if (isDimensionPresentInCurrentBlock[0]) {
+ byte[] col =
+ (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+ return ByteUtil.compare(filterRangeValues[0], col) >= 0;
+ }
+
+ if (isMeasurePresentInCurrentBlock[0]) {
+ Object col =
+ value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+ return comparator.compare(msrFilterRangeValues[0], col) >= 0;
+ }
+ return false;
+ }
+
private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
int numerOfRows) {
BitSet bitSet = new BitSet(numerOfRows);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 7f735c2..447ab46 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -232,6 +233,23 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
return null;
}
+ @Override
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ throws FilterUnsupportedException, IOException {
+ if (isDimensionPresentInCurrentBlock[0]) {
+ byte[] col =
+ (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+ return ByteUtil.compare(filterRangeValues[0], col) > 0;
+ }
+
+ if (isMeasurePresentInCurrentBlock[0]) {
+ Object col =
+ value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+ return comparator.compare(msrFilterRangeValues[0], col) > 0;
+ }
+ return false;
+ }
+
private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
int numerOfRows) {
BitSet bitSet = new BitSet(numerOfRows);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 19626f0..773089b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -607,7 +607,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
filterValues[filterMap[colCount]] = v;
}
if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
+ outputValues[projectionMap[colCount]] = Decimal.apply(v);
}
} else {
input.skipBytes(len);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/integration/spark-common-test/src/test/resources/streamSample.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/streamSample.csv b/integration/spark-common-test/src/test/resources/streamSample.csv
index 590ea90..3443237 100644
--- a/integration/spark-common-test/src/test/resources/streamSample.csv
+++ b/integration/spark-common-test/src/test/resources/streamSample.csv
@@ -1,6 +1,6 @@
-id,name,city,salary,file
-100000001,batch_1,city_1,0.1,school_1:school_11$20
-100000002,batch_2,city_2,0.2,school_2:school_22$30
-100000003,batch_3,city_3,0.3,school_3:school_33$40
-100000004,batch_4,city_4,0.4,school_4:school_44$50
-100000005,batch_5,city_5,0.5,school_5:school_55$60
+id,name,city,salary,tax,percent,birthday,register,updated,file
+100000001,batch_1,city_1,0.1,0.01,80.01,1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01,school_1:school_11$20
+100000002,batch_2,city_2,0.2,0.02,80.02,1990-01-02,2010-01-02 10:01:01,2010-01-02 10:01:01,school_2:school_22$30
+100000003,batch_3,city_3,0.3,0.03,80.03,1990-01-03,2010-01-03 10:01:01,2010-01-03 10:01:01,school_3:school_33$40
+100000004,batch_4,city_4,0.4,0.04,80.04,1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01,school_4:school_44$50
+100000005,batch_5,city_5,0.5,0.05,80.05,1990-01-05,2010-01-05 10:01:01,2010-01-05 10:01:01,school_5:school_55$60
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8a62a9b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 4b3a957..94baf86 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -18,7 +18,9 @@
package org.apache.spark.carbondata
import java.io.{File, PrintWriter}
+import java.math.BigDecimal
import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
import java.util.concurrent.Executors
import scala.collection.mutable
@@ -28,14 +30,13 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.types.StructType
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
-import org.apache.carbondata.streaming.CarbonStreamException
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -43,6 +44,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
private val dataFilePath = s"$resourcesPath/streamSample.csv"
override def beforeAll {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
sql("DROP DATABASE IF EXISTS streaming CASCADE")
sql("CREATE DATABASE streaming")
sql("USE streaming")
@@ -235,7 +242,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
val row = sql("select * from streaming.stream_table_file order by id").head()
- assertResult(Row(10, "name_10", "city_10", 100000.0))(row)
+ val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
+ assertResult(exceptedRow)(row)
}
// bad records
@@ -287,12 +295,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
// non-filter
- val result = sql("select * from streaming.stream_table_filter order by id").collect()
+ val result = sql("select * from streaming.stream_table_filter order by id, name").collect()
assert(result != null)
assert(result.length == 55)
// check one row of streaming data
- assert(result(1).getInt(0) == 1)
- assert(result(1).getString(1) == "name_1")
+ assert(result(1).isNullAt(0))
+ assert(result(1).getString(1) == "name_6")
// check one row of batch loading
assert(result(50).getInt(0) == 100000001)
assert(result(50).getString(1) == "batch_1")
@@ -300,35 +308,259 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
// filter
checkAnswer(
sql("select * from stream_table_filter where id = 1"),
- Seq(Row(1, "name_1", "city_1", 10000.0)))
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
- sql("select * from stream_table_filter where name = 'name_2'"),
- Seq(Row(2, "name_2", "", 20000.0)))
+ sql("select * from stream_table_filter where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where name = 'name_3'"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city = 'city_1'"),
- Seq(Row(1, "name_1", "city_1", 10000.0),
- Row(100000001, "batch_1", "city_1", 0.1)))
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
- sql("select * from stream_table_filter where id > 49 and id < 100000002"),
- Seq(Row(50, "name_50", "city_50", 500000.0),
- Row(100000001, "batch_1", "city_1", 0.1)))
+ sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where salary = 90000"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where salary between 80001 and 90000"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
checkAnswer(
- sql("select * from stream_table_filter where id is null"),
- Seq(Row(null, "name_6", "city_6", 60000.0)))
+ sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where birthday = '1990-01-04'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null order by name"),
+ Seq(Row(null, "", "", null, null, null, null, null, null),
+ Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where name = ''"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and name <> ''"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
checkAnswer(
sql("select * from stream_table_filter where city = ''"),
- Seq(Row(2, "name_2", "", 20000.0)))
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and city <> ''"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where salary is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and salary is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where tax is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and tax is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where percent is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and percent is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where birthday is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and birthday is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where register is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and register is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where updated is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null and updated is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
// agg
checkAnswer(
sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
"from stream_table_filter where id >= 2 and id <= 100000004"),
- Seq(Row(52, 100000004, "batch_1", 7692332, 400001278)))
+ Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
checkAnswer(
sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
@@ -382,54 +614,238 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
// non-filter
- val result = sql("select * from streaming.stream_table_filter_complex order by id").collect()
+ val result = sql("select * from streaming.stream_table_filter_complex order by id, name").collect()
assert(result != null)
assert(result.length == 55)
// check one row of streaming data
assert(result(0).isNullAt(0))
- assert(result(0).getString(1) == "name_6")
- assert(result(0).getStruct(4).getInt(1) == 6)
+ assert(result(0).getString(1) == "")
+ assert(result(0).getStruct(9).isNullAt(1))
// check one row of batch loading
assert(result(50).getInt(0) == 100000001)
assert(result(50).getString(1) == "batch_1")
- assert(result(50).getStruct(4).getInt(1) == 20)
+ assert(result(50).getStruct(9).getInt(1) == 20)
// filter
checkAnswer(
sql("select * from stream_table_filter_complex where id = 1"),
- Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1))))
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where name = 'name_3'"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
checkAnswer(
- sql("select * from stream_table_filter_complex where name = 'name_2'"),
- Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+ sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
checkAnswer(
- sql("select * from stream_table_filter_complex where file.age = 3"),
- Seq(Row(3, "name_3", "city_3", 30000.0, Row(wrap(Array("school_3", "school_33")), 3))))
+ sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3))))
checkAnswer(
sql("select * from stream_table_filter_complex where city = 'city_1'"),
- Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1)),
- Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
checkAnswer(
- sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
- Seq(Row(50, "name_50", "city_50", 500000.0, Row(wrap(Array("school_50", "school_5050")), 50)),
- Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+ sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where salary = 90000"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where salary between 80001 and 90000"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
checkAnswer(
- sql("select * from stream_table_filter_complex where id is null"),
- Seq(Row(null, "name_6", "city_6", 60000.0, Row(wrap(Array("school_6", "school_66")), 6))))
+ sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
+ Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)),
+ Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)),
+ Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null order by name"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null)),
+ Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where name = ''"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and name <> ''"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where city = ''"),
- Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and city <> ''"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where salary is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where tax is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and tax is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where percent is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and salary is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where birthday is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where register is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and register is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where updated is null"),
+ Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null, null)), null))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null and updated is not null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
// agg
checkAnswer(
sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
"from stream_table_filter_complex where id >= 2 and id <= 100000004"),
- Seq(Row(52, 100000004, "batch_1", 27, 1408)))
+ Seq(Row(51, 100000004, "batch_1", 27, 1406)))
checkAnswer(
sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
@@ -715,6 +1131,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
+ var timeRow = true
for (_ <- 1 to writeNums) {
// write 5 records per iteration
val stringBuilder = new StringBuilder()
@@ -723,22 +1140,32 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
if (badRecords) {
if (index == 2) {
// null value
- stringBuilder.append(index.toString + ",name_" + index
- + ",," + (10000.00 * index).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
+ stringBuilder.append(",,,,,,,,,")
} else if (index == 6) {
// illegal number
stringBuilder.append(index.toString + "abc,name_" + index
- + ",city_" + index + "," + (10000.00 * index).toString +
+ + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+ ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
",school_" + index + ":school_" + index + index + "$" + index)
} else {
- stringBuilder.append(index.toString + ",name_" + index
- + ",city_" + index + "," + (10000.00 * index).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
+
+ if (index == 9 && timeRow) {
+ timeRow = false
+ stringBuilder.append(index.toString + ",name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString + ",0.04,80.04" +
+ ",1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01" +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ } else {
+ stringBuilder.append(index.toString + ",name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+ ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
}
} else {
stringBuilder.append(index.toString + ",name_" + index
- + ",city_" + index + "," + (10000.00 * index).toString +
+ + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+ ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01" +
",school_" + index + ":school_" + index + index + "$" + index)
}
stringBuilder.append("\n")
@@ -781,6 +1208,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
.option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+ .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
.start()
qry.awaitTermination()
@@ -857,9 +1285,15 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
"name_" + id,
"city_" + id,
10000.00 * id,
+ BigDecimal.valueOf(0.01),
+ 80.01,
+ "1990-01-01",
+ "2010-01-01 10:01:01",
+ "2010-01-01 10:01:01",
"school_" + id + ":school_" + id + id + "$" + id)
}
- val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary", "file")
+ val csvDataDF = spark.createDataFrame(csvRDD).toDF(
+ "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "file")
csvDataDF.write
.option("header", "false")
@@ -875,12 +1309,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
tableIdentifier: TableIdentifier): Thread = {
new Thread() {
override def run(): Unit = {
- val inputSchema = new StructType()
- .add("id", "integer")
- .add("name", "string")
- .add("city", "string")
- .add("salary", "float")
- .add("file", "string")
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream.text(csvDataDir)
@@ -892,6 +1320,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
+ .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.start()
qry.awaitTermination()
@@ -914,11 +1343,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| id INT,
| name STRING,
| city STRING,
- | salary FLOAT
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
- | 'sort_columns'='name', 'dictionary_include'='city')
+ | 'sort_columns'='name', 'dictionary_include'='city,register')
| """.stripMargin)
if (withBatchLoad) {
@@ -938,11 +1372,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| name STRING,
| city STRING,
| salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP,
| file struct<school:array<string>, age:int>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
- | 'sort_columns'='name', 'dictionary_include'='city')
+ | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated')
| """.stripMargin)
if (withBatchLoad) {