You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2019/12/13 16:32:41 UTC
[carbondata] branch master updated: [CARBONDATA-3611] Fix failed
when filter with measure columns on stream table when this stream table
includes complex columns
This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3ebf7f6 [CARBONDATA-3611] Fix failed when filter with measure columns on stream table when this stream table includes complex columns
3ebf7f6 is described below
commit 3ebf7f61f3326bdc9f16a1665d44bea82828d3f7
Author: Zhang Zhichao <44...@qq.com>
AuthorDate: Sat Dec 7 11:25:04 2019 +0800
[CARBONDATA-3611] Fix failed when filter with measure columns on stream table when this stream table includes complex columns
Problem:
Filter failed with measure columns on stream table when this stream table includes complex columns
Solution:
Use 'segmentProperties.getDimensions().size()' instead of 'segmentProperties.getLastDimensionColOrdinal()' when set 'columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray' on stream data file.
This closes #3503
---
.../indexstore/blockletindex/BlockDataMap.java | 8 +-
.../scan/executor/impl/AbstractQueryExecutor.java | 2 +-
.../carbondata/core/scan/filter/FilterUtil.java | 57 ++--
.../carbondata/core/stream/StreamPruner.java | 2 +-
.../datamap/examples/MinMaxIndexDataMap.java | 2 +-
.../hadoop/stream/StreamRecordReader.java | 4 +-
.../carbondata/TestStreamingTableQueryFilter.scala | 315 +++++++++++++++++++++
7 files changed, 360 insertions(+), 30 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index e83985f..c865603 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -610,8 +610,8 @@ public class BlockDataMap extends CoarseGrainDataMap
@Override
public boolean isScanRequired(FilterResolverIntf filterExp) {
- FilterExecuter filterExecuter = FilterUtil
- .getFilterExecuterTree(filterExp, getSegmentProperties(), null, getMinMaxCacheColumns());
+ FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(
+ filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(), false);
DataMapRow unsafeRow = taskSummaryDMStore
.getDataMapRow(getTaskSummarySchema(), taskSummaryDMStore.getRowCount() - 1);
boolean isScanRequired = FilterExpressionProcessor
@@ -741,8 +741,8 @@ public class BlockDataMap extends CoarseGrainDataMap
// Remove B-tree jump logic as start and end key prepared is not
// correct for old store scenarios
int entryIndex = 0;
- FilterExecuter filterExecuter = FilterUtil
- .getFilterExecuterTree(filterExp, getSegmentProperties(), null, getMinMaxCacheColumns());
+ FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(
+ filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(), false);
// flag to be used for deciding whether use min/max in executor pruning for BlockletDataMap
boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp);
// min and max for executor pruning
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 9688868..c891ba2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -522,7 +522,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
filterResolverIntf = queryModel.getDataMapFilter().getResolver();
blockExecutionInfo.setFilterExecuterTree(
FilterUtil.getFilterExecuterTree(filterResolverIntf, segmentProperties,
- blockExecutionInfo.getComlexDimensionInfoMap()));
+ blockExecutionInfo.getComlexDimensionInfoMap(), false));
}
try {
startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index a096051..679ee43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -134,12 +134,16 @@ public final class FilterUtil {
*
* @param filterExpressionResolverTree
* @param segmentProperties
+ * @param complexDimensionInfoMap
+ * @param minMaxCacheColumns
+ * @param isStreamDataFile: whether create filter executer tree for stream data files
* @return FilterExecuter instance
+ *
*/
private static FilterExecuter createFilterExecuterTree(
FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties,
Map<Integer, GenericQueryType> complexDimensionInfoMap,
- List<CarbonColumn> minMaxCacheColumns) {
+ List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) {
FilterExecuterType filterExecuterType = filterExpressionResolverTree.getFilterExecuterType();
if (null != filterExecuterType) {
switch (filterExecuterType) {
@@ -154,7 +158,7 @@ public final class FilterUtil {
}
// return true filter expression if filter column min/max is not cached in driver
if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(filterExpressionResolverTree,
- segmentProperties, minMaxCacheColumns)) {
+ segmentProperties, minMaxCacheColumns, isStreamDataFile)) {
return new TrueFilterExecutor();
}
return getIncludeFilterExecuter(
@@ -167,15 +171,15 @@ public final class FilterUtil {
case OR:
return new OrFilterExecuterImpl(
createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties,
- complexDimensionInfoMap, minMaxCacheColumns),
+ complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile),
createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties,
- complexDimensionInfoMap, minMaxCacheColumns));
+ complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile));
case AND:
return new AndFilterExecuterImpl(
createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties,
- complexDimensionInfoMap, minMaxCacheColumns),
+ complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile),
createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties,
- complexDimensionInfoMap, minMaxCacheColumns));
+ complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile));
case ROWLEVEL_LESSTHAN:
case ROWLEVEL_LESSTHAN_EQUALTO:
case ROWLEVEL_GREATERTHAN_EQUALTO:
@@ -186,7 +190,7 @@ public final class FilterUtil {
if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(
rowLevelRangeFilterResolver.getDimColEvaluatorInfoList(),
rowLevelRangeFilterResolver.getMsrColEvalutorInfoList(), segmentProperties,
- minMaxCacheColumns)) {
+ minMaxCacheColumns, isStreamDataFile)) {
return new TrueFilterExecutor();
}
return RowLevelRangeTypeExecuterFactory
@@ -195,7 +199,7 @@ public final class FilterUtil {
case RANGE:
// return true filter expression if filter column min/max is not cached in driver
if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(filterExpressionResolverTree,
- segmentProperties, minMaxCacheColumns)) {
+ segmentProperties, minMaxCacheColumns, isStreamDataFile)) {
return new TrueFilterExecutor();
}
return new RangeValueFilterExecuterImpl(
@@ -291,20 +295,21 @@ public final class FilterUtil {
private static boolean checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(
List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
List<MeasureColumnResolvedFilterInfo> msrColEvaluatorInfoList,
- SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns) {
+ SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns,
+ boolean isStreamDataFile) {
boolean replaceCurrentNodeWithTrueFilter = false;
ColumnResolvedFilterInfo columnResolvedFilterInfo = null;
if (!msrColEvaluatorInfoList.isEmpty()) {
columnResolvedFilterInfo = msrColEvaluatorInfoList.get(0);
replaceCurrentNodeWithTrueFilter =
checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties,
- minMaxCacheColumns, true);
+ minMaxCacheColumns, true, isStreamDataFile);
} else {
columnResolvedFilterInfo = dimColEvaluatorInfoList.get(0);
if (!columnResolvedFilterInfo.getDimension().hasEncoding(Encoding.IMPLICIT)) {
replaceCurrentNodeWithTrueFilter =
checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties,
- minMaxCacheColumns, false);
+ minMaxCacheColumns, false, isStreamDataFile);
}
}
return replaceCurrentNodeWithTrueFilter;
@@ -317,24 +322,25 @@ public final class FilterUtil {
* @param filterExpressionResolverTree
* @param segmentProperties
* @param minMaxCacheColumns
+ * @Param isStreamDataFile
* @return
*/
private static boolean checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(
FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties,
- List<CarbonColumn> minMaxCacheColumns) {
+ List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) {
boolean replaceCurrentNodeWithTrueFilter = false;
ColumnResolvedFilterInfo columnResolvedFilterInfo = null;
if (null != filterExpressionResolverTree.getMsrColResolvedFilterInfo()) {
columnResolvedFilterInfo = filterExpressionResolverTree.getMsrColResolvedFilterInfo();
replaceCurrentNodeWithTrueFilter =
checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties,
- minMaxCacheColumns, true);
+ minMaxCacheColumns, true, isStreamDataFile);
} else {
columnResolvedFilterInfo = filterExpressionResolverTree.getDimColResolvedFilterInfo();
if (!columnResolvedFilterInfo.getDimension().hasEncoding(Encoding.IMPLICIT)) {
replaceCurrentNodeWithTrueFilter =
checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties,
- minMaxCacheColumns, false);
+ minMaxCacheColumns, false, isStreamDataFile);
}
}
return replaceCurrentNodeWithTrueFilter;
@@ -352,7 +358,7 @@ public final class FilterUtil {
*/
private static boolean checkIfFilterColumnIsCachedInDriver(
ColumnResolvedFilterInfo columnResolvedFilterInfo, SegmentProperties segmentProperties,
- List<CarbonColumn> minMaxCacheColumns, boolean isMeasure) {
+ List<CarbonColumn> minMaxCacheColumns, boolean isMeasure, boolean isStreamDataFile) {
boolean replaceCurrentNodeWithTrueFilter = false;
CarbonColumn columnFromCurrentBlock = null;
if (isMeasure) {
@@ -377,8 +383,17 @@ public final class FilterUtil {
// if columns to be cached are not specified then in that case all columns will be cached
// and then the ordinal of column will be its index in the min/max byte array
if (isMeasure) {
- columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray(
- segmentProperties.getLastDimensionColOrdinal() + columnFromCurrentBlock.getOrdinal());
+ // when read from stream data file, minmax columns cache don't include complex columns,
+ // so it can not use 'segmentProperties.getLastDimensionColOrdinal()' as
+ // last dimension ordinal.
+ if (isStreamDataFile) {
+ columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray(
+ segmentProperties.getDimensions().size() + columnFromCurrentBlock.getOrdinal());
+ } else {
+ columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray(
+ segmentProperties.getLastDimensionColOrdinal() + columnFromCurrentBlock
+ .getOrdinal());
+ }
} else {
columnResolvedFilterInfo
.setColumnIndexInMinMaxByteArray(columnFromCurrentBlock.getOrdinal());
@@ -1492,9 +1507,9 @@ public final class FilterUtil {
*/
public static FilterExecuter getFilterExecuterTree(
FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties,
- Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+ Map<Integer, GenericQueryType> complexDimensionInfoMap, boolean isStreamDataFile) {
return getFilterExecuterTree(filterExpressionResolverTree, segmentProperties,
- complexDimensionInfoMap, null);
+ complexDimensionInfoMap, null, isStreamDataFile);
}
/**
@@ -1507,9 +1522,9 @@ public final class FilterUtil {
public static FilterExecuter getFilterExecuterTree(
FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties,
Map<Integer, GenericQueryType> complexDimensionInfoMap,
- List<CarbonColumn> minMaxCacheColumns) {
+ List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) {
return createFilterExecuterTree(filterExpressionResolverTree, segmentProperties,
- complexDimensionInfoMap, minMaxCacheColumns);
+ complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile);
}
/**
diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
index c92a8a1..e8790ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
+++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
@@ -72,7 +72,7 @@ public class StreamPruner {
SegmentProperties segmentProperties =
new SegmentProperties(listOfColumns, columnCardinality);
filterExecuter = FilterUtil.getFilterExecuterTree(
- filterExp, segmentProperties, null, minMaxCacheColumns);
+ filterExp, segmentProperties, null, minMaxCacheColumns, false);
}
}
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index d32afd9..d860229 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -135,7 +135,7 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
}
} else {
FilterExecuter filterExecuter =
- FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null, false);
for (int blkIdx = 0; blkIdx < readMinMaxDataMap.length; blkIdx++) {
for (int blkltIdx = 0; blkltIdx < readMinMaxDataMap[blkIdx].length; blkltIdx++) {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index 020af65..3ea65e5 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -236,8 +236,8 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
FilterResolverIntf resolverIntf = model.getDataMapFilter().getResolver();
- filter =
- FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, complexDimensionInfoMap);
+ filter = FilterUtil.getFilterExecuterTree(
+ resolverIntf, segmentProperties, complexDimensionInfoMap, true);
// for row filter, we need update column index
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
carbonTable.getDimensionOrdinalMax());
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala
new file mode 100644
index 0000000..f47e18d
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.PrintWriter
+import java.math.BigDecimal
+import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestStreamingTableQueryFilter extends QueryTest with BeforeAndAfterAll {
+
+ private val spark = sqlContext.sparkSession
+ private val dataFilePath = s"$resourcesPath/streamSample_with_long_string.csv"
+ private val longStrValue = "abc" * 12000
+
+ override def beforeAll {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ sql("DROP DATABASE IF EXISTS streaming_table_filter CASCADE")
+ sql("CREATE DATABASE streaming_table_filter")
+ sql("USE streaming_table_filter")
+
+ dropTable()
+ createTableWithComplexType(
+ tableName = "stream_filter", streaming = true, withBatchLoad = true)
+ }
+
+ override def afterAll {
+ dropTable()
+ sql("USE default")
+ sql("DROP DATABASE IF EXISTS streaming_table_filter CASCADE")
+ }
+
+ def dropTable(): Unit = {
+ sql("drop table if exists streaming_table_filter.stream_filter")
+ }
+
+ test("[CARBONDATA-3611] Fix failed when filter with measure columns on stream table when this stream table includes complex columns") {
+ executeStreamingIngest(
+ tableName = "stream_filter",
+ batchNums = 2,
+ rowNumsEachBatch = 25,
+ intervalOfSource = 5,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ handoffSize = 51200,
+ autoHandoff = false
+ )
+
+ // non-filter
+ val result = sql("select * from streaming_table_filter.stream_filter order by id, name").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(3).getString(1) == "name_4")
+ assert(result(3).getString(9) == ("4" + longStrValue))
+ // check one row of batch loading
+ assert(result(52).getInt(0) == 100000003)
+ assert(result(52).getString(1) == "batch_3")
+ assert(result(52).getString(9) == ("3" + longStrValue))
+ assert(result(52).getStruct(10).getInt(1) == 40)
+
+ // filter
+ checkAnswer(
+ sql("select * from streaming_table_filter.stream_filter where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 1))))
+
+ checkAnswer(
+ sql("select * from streaming_table_filter.stream_filter where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from streaming_table_filter.stream_filter where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from streaming_table_filter.stream_filter where salary = 490000.0 and percent = 80.01"),
+ Seq(Row(49, "name_49", "city_49", 490000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("49" + longStrValue), Row(wrap(Array("school_49", "school_4949")), 49))))
+
+ checkAnswer(
+ sql("select * from streaming_table_filter.stream_filter where id > 20 and salary = 300000.0 and file.age > 25"),
+ Seq(Row(30, "name_30", "city_30", 300000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("30" + longStrValue), Row(wrap(Array("school_30", "school_3030")), 30))))
+ }
+
+ def createWriteSocketThread(
+ serverSocket: ServerSocket,
+ writeNums: Int,
+ rowNums: Int,
+ intervalSecond: Int): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to writeNums) {
+ // write 5 records per iteration
+ val stringBuilder = new StringBuilder()
+ for (_ <- 1 to rowNums) {
+ index = index + 1
+ stringBuilder.append(index.toString + ",name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+ ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01," +
+ index.toString() + ("abc" * 12000) +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ stringBuilder.append("\n")
+ }
+ socketWriter.append(stringBuilder.toString())
+ socketWriter.flush()
+ Thread.sleep(1000 * intervalSecond)
+ }
+ socketWriter.close()
+ }
+ }
+ }
+
+ def createSocketStreamingThread(
+ spark: SparkSession,
+ port: Int,
+ carbonTable: CarbonTable,
+ tableIdentifier: TableIdentifier,
+ intervalSecond: Int = 2,
+ handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+ autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+ ): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ var qry: StreamingQuery = null
+ try {
+ import spark.implicits._
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", port)
+ .load().as[String]
+ .map(_.split(","))
+ .map { fields => {
+ val tmp = fields(10).split("\\$")
+ val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+ StreamLongStrData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat,
+ BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble,
+ fields(6), fields(7), fields(8), fields(9), file)
+ } }
+
+ // Write data from socket stream to carbondata file
+ // repartition to simulate an empty partition when readSocketDF has only one row
+ qry = readSocketDF.repartition(2).writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime(s"$intervalSecond seconds"))
+ .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+ .option("dbName", tableIdentifier.database.get)
+ .option("tableName", tableIdentifier.table)
+ .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+ .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
+ .start()
+ qry.awaitTermination()
+ } catch {
+ case ex: Throwable =>
+ LOGGER.error(ex.getMessage)
+ throw new Exception(ex.getMessage, ex)
+ } finally {
+ if (null != qry) {
+ qry.stop()
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
+ */
+ def executeStreamingIngest(
+ tableName: String,
+ batchNums: Int,
+ rowNumsEachBatch: Int,
+ intervalOfSource: Int,
+ intervalOfIngest: Int,
+ continueSeconds: Int,
+ handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+ autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+ ): Unit = {
+ val identifier = new TableIdentifier(tableName, Option("streaming_table_filter"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ var server: ServerSocket = null
+ try {
+ server = getServerSocket()
+ val thread1 = createWriteSocketThread(
+ serverSocket = server,
+ writeNums = batchNums,
+ rowNums = rowNumsEachBatch,
+ intervalSecond = intervalOfSource)
+ val thread2 = createSocketStreamingThread(
+ spark = spark,
+ port = server.getLocalPort,
+ carbonTable = carbonTable,
+ tableIdentifier = identifier,
+ intervalSecond = intervalOfIngest,
+ handoffSize = handoffSize,
+ autoHandoff = autoHandoff)
+ thread1.start()
+ thread2.start()
+ Thread.sleep(continueSeconds * 1000)
+ thread2.interrupt()
+ thread1.interrupt()
+ } finally {
+ if (null != server) {
+ server.close()
+ }
+ }
+ }
+
+ def createTableWithComplexType(
+ tableName: String,
+ streaming: Boolean,
+ withBatchLoad: Boolean): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE streaming_table_filter.$tableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP,
+ | longstr STRING,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+ | 'sort_columns'='name', 'dictionary_include'='name,tax,percent,updated', 'LONG_STRING_COLUMNS'='longstr')
+ | """.stripMargin)
+
+ if (withBatchLoad) {
+ // batch loading 5 rows
+ executeBatchLoad(tableName)
+ }
+ }
+
+ def executeBatchLoad(tableName: String): Unit = {
+ sql(
+ s"LOAD DATA LOCAL INPATH '$dataFilePath' INTO TABLE streaming_table_filter.$tableName OPTIONS" +
+ "('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+ }
+
+ def wrap(array: Array[String]) = {
+ new mutable.WrappedArray.ofRef(array)
+ }
+
+ /**
+ * get a ServerSocket
+ * if the address was already used, it will retry to use new port number.
+ *
+ * @return ServerSocket
+ */
+ def getServerSocket(): ServerSocket = {
+ var port = 7071
+ var serverSocket: ServerSocket = null
+ var retry = false
+ do {
+ try {
+ retry = false
+ serverSocket = new ServerSocket(port)
+ } catch {
+ case ex: BindException =>
+ retry = true
+ port = port + 2
+ if (port >= 65535) {
+ throw ex
+ }
+ }
+ } while (retry)
+ serverSocket
+ }
+}