You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/05 06:22:26 UTC
[1/2] carbondata git commit: [CARBONDATA-2853] Implement min/max
index for streaming segment
Repository: carbondata
Updated Branches:
refs/heads/master 526e3bfa1 -> 21a72bf2e
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 9e83924..89f00c9 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.streaming.segment;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
@@ -29,21 +31,31 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletIndex;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.streaming.CarbonStreamRecordWriter;
+import org.apache.carbondata.streaming.index.StreamFileIndex;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -221,10 +233,48 @@ public class StreamSegment {
}
}
+ public static BlockletMinMaxIndex collectMinMaxIndex(SimpleStatsResult[] dimStats,
+ SimpleStatsResult[] mrsStats) {
+ BlockletMinMaxIndex minMaxIndex = new BlockletMinMaxIndex();
+ byte[][] maxIndexes = new byte[dimStats.length + mrsStats.length][];
+ for (int index = 0; index < dimStats.length; index++) {
+ maxIndexes[index] =
+ CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMax());
+ }
+ for (int index = 0; index < mrsStats.length; index++) {
+ maxIndexes[dimStats.length + index] =
+ CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMax());
+ }
+ minMaxIndex.setMaxValues(maxIndexes);
+
+ byte[][] minIndexes = new byte[maxIndexes.length][];
+ for (int index = 0; index < dimStats.length; index++) {
+ minIndexes[index] =
+ CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMin());
+ }
+ for (int index = 0; index < mrsStats.length; index++) {
+ minIndexes[dimStats.length + index] =
+ CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMin());
+ }
+ minMaxIndex.setMinValues(minIndexes);
+ return minMaxIndex;
+ }
+
+ /**
+ * create a StreamBlockIndex from the SimpleStatsResult array
+ */
+ private static StreamFileIndex createStreamBlockIndex(String fileName,
+ BlockletMinMaxIndex minMaxIndex, DataType[] msrDataTypes, int blockletRowCount) {
+ StreamFileIndex streamFileIndex =
+ new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
+ streamFileIndex.setMsrDataTypes(msrDataTypes);
+ return streamFileIndex;
+ }
+
/**
* invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
*/
- public static void appendBatchData(CarbonIterator<Object[]> inputIterators,
+ public static StreamFileIndex appendBatchData(CarbonIterator<Object[]> inputIterators,
TaskAttemptContext job, CarbonLoadModel carbonLoadModel) throws Exception {
CarbonStreamRecordWriter writer = null;
try {
@@ -235,11 +285,15 @@ public class StreamSegment {
writer.getSegmentDir(),
writer.getFileName(),
CarbonTablePath.getCarbonStreamIndexFileName());
-
+ int blockletRowCount = 0;
while (inputIterators.hasNext()) {
writer.write(null, inputIterators.next());
+ blockletRowCount++;
}
inputIterators.close();
+
+ return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(),
+ writer.getMeasureDataTypes(), blockletRowCount);
} catch (Throwable ex) {
if (writer != null) {
LOGGER.error(ex, "Failed to append batch data to stream segment: " +
@@ -331,6 +385,7 @@ public class StreamSegment {
} else if (blockIndex.getFile_size() < file.getSize()) {
FileFactory.truncateFile(filePath, fileType, blockIndex.getFile_size());
}
+ break;
}
}
} finally {
@@ -357,11 +412,208 @@ public class StreamSegment {
}
/**
- * update carbonindex file after a stream batch.
+ * read index file to list BlockIndex
+ *
+ * @param indexPath path of the index file
+ * @param fileType file type of the index file
+ * @return the list of BlockIndex in the index file
+ * @throws IOException
+ */
+ private static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType)
+ throws IOException {
+ List<BlockIndex> blockIndexList = new ArrayList<>();
+ CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+ if (index.exists()) {
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ try {
+ indexReader.openThriftReader(indexPath);
+ while (indexReader.hasNext()) {
+ blockIndexList.add(indexReader.readBlockIndexInfo());
+ }
+ } finally {
+ indexReader.closeThriftReader();
+ }
+ }
+ return blockIndexList;
+ }
+
+ /**
+ * combine the index of new blocklet and the BlockletMinMaxIndex index of stream file
+ * 1. if file index is null, not require Min/Max index
+ * 2. if file index is not null,
+ * 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream
+ * 2.2 if blocklet index is not null, combine these two index
+ */
+ private static void mergeBatchMinMax(StreamFileIndex blockletIndex, BlockletMinMaxIndex fileIndex)
+ throws IOException {
+ if (fileIndex == null) {
+ // backward compatibility
+ // it will not create a min/max index for the old stream file(without min/max index).
+ blockletIndex.setMinMaxIndex(null);
+ return;
+ }
+
+ DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
+ SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+ for (int index = 0; index < comparators.length; index++) {
+ comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+ }
+
+ // min value
+ byte[][] minValues = blockletIndex.getMinMaxIndex().getMinValues();
+ byte[][] mergedMinValues = fileIndex.getMinValues();
+ if (minValues == null || minValues.length == 0) {
+ // use file index
+ blockletIndex.getMinMaxIndex().setMinValues(mergedMinValues);
+ } else if (mergedMinValues != null && mergedMinValues.length != 0) {
+ if (minValues.length != mergedMinValues.length) {
+ throw new IOException("the lengths of the min values should be same.");
+ }
+ int dimCount = minValues.length - msrDataTypes.length;
+ for (int index = 0; index < minValues.length; index++) {
+ if (index < dimCount) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+ > 0) {
+ minValues[index] = mergedMinValues[index];
+ }
+ } else {
+ Object object = DataTypeUtil.getMeasureObjectFromDataType(
+ minValues[index], msrDataTypes[index - dimCount]);
+ Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+ mergedMinValues[index], msrDataTypes[index - dimCount]);
+ if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
+ minValues[index] = mergedMinValues[index];
+ }
+ }
+ }
+ }
+
+ // max value
+ byte[][] maxValues = blockletIndex.getMinMaxIndex().getMaxValues();
+ byte[][] mergedMaxValues = fileIndex.getMaxValues();
+ if (maxValues == null || maxValues.length == 0) {
+ blockletIndex.getMinMaxIndex().setMaxValues(mergedMaxValues);
+ } else if (mergedMaxValues != null && mergedMaxValues.length != 0) {
+ if (maxValues.length != mergedMaxValues.length) {
+ throw new IOException("the lengths of the max values should be same.");
+ }
+ int dimCount = maxValues.length - msrDataTypes.length;
+ for (int index = 0; index < maxValues.length; index++) {
+ if (index < dimCount) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+ < 0) {
+ maxValues[index] = mergedMaxValues[index];
+ }
+ } else {
+ Object object = DataTypeUtil.getMeasureObjectFromDataType(
+ maxValues[index], msrDataTypes[index - dimCount]);
+ Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+ mergedMaxValues[index], msrDataTypes[index - dimCount]);
+ if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
+ maxValues[index] = mergedMaxValues[index];
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * merge blocklet min/max to generate batch min/max
+ */
+ public static BlockletMinMaxIndex mergeBlockletMinMax(BlockletMinMaxIndex to,
+ BlockletMinMaxIndex from, DataType[] msrDataTypes) {
+ if (to == null) {
+ return from;
+ }
+ if (from == null) {
+ return to;
+ }
+
+ SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+ for (int index = 0; index < comparators.length; index++) {
+ comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+ }
+
+ // min value
+ byte[][] minValues = to.getMinValues();
+ byte[][] mergedMinValues = from.getMinValues();
+ int dimCount1 = minValues.length - msrDataTypes.length;
+ for (int index = 0; index < minValues.length; index++) {
+ if (index < dimCount1) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+ > 0) {
+ minValues[index] = mergedMinValues[index];
+ }
+ } else {
+ Object object = DataTypeUtil.getMeasureObjectFromDataType(
+ minValues[index], msrDataTypes[index - dimCount1]);
+ Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+ mergedMinValues[index], msrDataTypes[index - dimCount1]);
+ if (comparators[index - dimCount1].compare(object, mergedObject) > 0) {
+ minValues[index] = mergedMinValues[index];
+ }
+ }
+ }
+
+ // max value
+ byte[][] maxValues = to.getMaxValues();
+ byte[][] mergedMaxValues = from.getMaxValues();
+ int dimCount2 = maxValues.length - msrDataTypes.length;
+ for (int index = 0; index < maxValues.length; index++) {
+ if (index < dimCount2) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+ < 0) {
+ maxValues[index] = mergedMaxValues[index];
+ }
+ } else {
+ Object object = DataTypeUtil.getMeasureObjectFromDataType(
+ maxValues[index], msrDataTypes[index - dimCount2]);
+ Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+ mergedMaxValues[index], msrDataTypes[index - dimCount2]);
+ if (comparators[index - dimCount2].compare(object, mergedObject) < 0) {
+ maxValues[index] = mergedMaxValues[index];
+ }
+ }
+ }
+ return to;
+ }
+
+ /**
+ * merge new blocklet index and old file index to create new file index
*/
- public static void updateIndexFile(String segmentDir) throws IOException {
+ private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap,
+ String indexPath, FileFactory.FileType fileType) throws IOException {
+ List<BlockIndex> blockIndexList = readIndexFile(indexPath, fileType);
+ for (BlockIndex blockIndex : blockIndexList) {
+ BlockletMinMaxIndex fileIndex = CarbonMetadataUtil
+ .convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index());
+ StreamFileIndex blockletIndex = indexMap.get(blockIndex.getFile_name());
+ if (blockletIndex == null) {
+ // should index all stream file
+ indexMap.put(blockIndex.getFile_name(),
+ new StreamFileIndex(blockIndex.getFile_name(), fileIndex, blockIndex.getNum_rows()));
+ } else {
+ // merge minMaxIndex into StreamBlockIndex
+ blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows());
+ mergeBatchMinMax(blockletIndex, fileIndex);
+ }
+ }
+ }
+
+ /**
+ * update carbon index file after a stream batch.
+ */
+ public static void updateIndexFile(String segmentDir,
+ StreamFileIndex[] blockIndexes) throws IOException {
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+ // update min/max index
+ Map<String, StreamFileIndex> indexMap = new HashMap<>();
+ for (StreamFileIndex fileIndex : blockIndexes) {
+ indexMap.put(fileIndex.getFileName(), fileIndex);
+ }
+ updateStreamFileIndex(indexMap, filePath, fileType);
+
String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
try {
@@ -372,10 +624,19 @@ public class StreamSegment {
blockIndex = new BlockIndex();
blockIndex.setFile_name(file.getName());
blockIndex.setFile_size(file.getSize());
- // TODO need to collect these information
- blockIndex.setNum_rows(-1);
blockIndex.setOffset(-1);
- blockIndex.setBlock_index(new BlockletIndex());
+ // set min/max index
+ BlockletIndex blockletIndex = new BlockletIndex();
+ blockIndex.setBlock_index(blockletIndex);
+ StreamFileIndex streamFileIndex = indexMap.get(blockIndex.getFile_name());
+ if (streamFileIndex != null) {
+ blockletIndex.setMin_max_index(
+ CarbonMetadataUtil.convertMinMaxIndex(streamFileIndex.getMinMaxIndex()));
+ blockIndex.setNum_rows(streamFileIndex.getRowCount());
+ } else {
+ blockIndex.setNum_rows(-1);
+ }
+ // write block index
writer.writeThrift(blockIndex);
}
writer.close();
[2/2] carbondata git commit: [CARBONDATA-2853] Implement min/max
index for streaming segment
Posted by ja...@apache.org.
[CARBONDATA-2853] Implement min/max index for streaming segment
Implement file-level min/max index (driver side) and blocklet level min/max index (worker side) on stream files to improve read performance.
This closes #2644
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/21a72bf2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/21a72bf2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/21a72bf2
Branch: refs/heads/master
Commit: 21a72bf2efa9d76c38d7da92b7bd4eaa5cd4a3ea
Parents: 526e3bf
Author: QiangCai <qi...@qq.com>
Authored: Sat Aug 25 17:18:16 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Sep 5 14:22:03 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/stream/StreamFile.java | 63 +++++
.../carbondata/core/stream/StreamPruner.java | 143 ++++++++++
.../core/util/CarbonMetadataUtil.java | 37 ++-
.../hadoop/api/CarbonTableInputFormat.java | 111 ++++----
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 6 +-
.../streaming/CarbonAppendableStreamSink.scala | 18 +-
.../TestStreamingTableOperation.scala | 96 +++++++
.../streaming/CarbonStreamRecordReader.java | 16 +-
.../streaming/CarbonStreamRecordWriter.java | 48 +++-
.../streaming/StreamBlockletWriter.java | 87 +++++-
.../streaming/index/StreamFileIndex.java | 77 ++++++
.../streaming/segment/StreamSegment.java | 275 ++++++++++++++++++-
12 files changed, 883 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java
new file mode 100644
index 0000000..64c916f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamFile.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.stream;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+
+@InterfaceAudience.Internal
+public class StreamFile {
+
+ private String segmentNo;
+
+ private String filePath;
+
+ private long fileSize;
+
+ private BlockletMinMaxIndex minMaxIndex;
+
+ public StreamFile(String segmentNo, String filePath, long fileSize) {
+ this.segmentNo = segmentNo;
+ this.filePath = filePath;
+ this.fileSize = fileSize;
+ }
+
+ public String getSegmentNo() {
+ return segmentNo;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public BlockletMinMaxIndex getMinMaxIndex() {
+ return minMaxIndex;
+ }
+
+ public void setMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
+ this.minMaxIndex = minMaxIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
new file mode 100644
index 0000000..ac3589f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+@InterfaceAudience.Internal
+public class StreamPruner {
+
+ private CarbonTable carbonTable;
+ private FilterExecuter filterExecuter;
+
+ public StreamPruner(CarbonTable carbonTable) {
+ this.carbonTable = carbonTable;
+ }
+
+ public void init(FilterResolverIntf filterExp) {
+ if (filterExp != null) {
+ // cache all columns
+ List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
+ for (CarbonDimension dimension : carbonTable.getDimensions()) {
+ if (!dimension.isComplex()) {
+ minMaxCacheColumns.add(dimension);
+ }
+ }
+ minMaxCacheColumns.addAll(carbonTable.getMeasures());
+ // prepare cardinality of all dimensions
+ List<ColumnSchema> listOfColumns =
+ carbonTable.getTableInfo().getFactTable().getListOfColumns();
+ int[] columnCardinality = new int[listOfColumns.size()];
+ for (int index = 0; index < columnCardinality.length; index++) {
+ columnCardinality[index] = Integer.MAX_VALUE;
+ }
+ // initial filter executor
+ SegmentProperties segmentProperties =
+ new SegmentProperties(listOfColumns, columnCardinality);
+ filterExecuter = FilterUtil.getFilterExecuterTree(
+ filterExp, segmentProperties, null, minMaxCacheColumns);
+ }
+ }
+
+ public List<StreamFile> prune(List<Segment> segments) throws IOException {
+ if (filterExecuter == null) {
+ // if filter is null, list all steam files
+ return listAllStreamFiles(segments, false);
+ } else {
+ List<StreamFile> streamFileList = new ArrayList<>();
+ for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
+ if (isScanRequire(streamFile)) {
+ // if stream file is required to scan
+ streamFileList.add(streamFile);
+ streamFile.setMinMaxIndex(null);
+ }
+ }
+ return streamFileList;
+ }
+ }
+
+ private boolean isScanRequire(StreamFile streamFile) {
+ // backward compatibility, old stream file without min/max index
+ if (streamFile.getMinMaxIndex() == null) {
+ return true;
+ }
+ byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
+ byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
+ BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue);
+ if (!bitSet.isEmpty()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // TODO optimize and move the code to StreamSegment , but it's in the streaming module.
+ private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
+ throws IOException {
+ List<StreamFile> streamFileList = new ArrayList<>();
+ for (Segment segment : segments) {
+ String segmentDir = CarbonTablePath.getSegmentPath(
+ carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo());
+ String indexFile = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+ FileFactory.FileType fileType = FileFactory.getFileType(indexFile);
+ if (FileFactory.isFileExist(indexFile, fileType)) {
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ indexReader.openThriftReader(indexFile);
+ try {
+ while (indexReader.hasNext()) {
+ BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+ String filePath = segmentDir + File.separator + blockIndex.getFile_name();
+ long length = blockIndex.getFile_size();
+ StreamFile streamFile = new StreamFile(segment.getSegmentNo(), filePath, length);
+ streamFileList.add(streamFile);
+ if (withMinMax) {
+ if (blockIndex.getBlock_index() != null
+ && blockIndex.getBlock_index().getMin_max_index() != null) {
+ streamFile.setMinMaxIndex(CarbonMetadataUtil
+ .convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index()));
+ }
+ }
+ }
+ } finally {
+ indexReader.closeThriftReader();
+ }
+ }
+ }
+ return streamFileList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 70443d8..571a247 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -96,14 +96,41 @@ public class CarbonMetadataUtil {
return footer;
}
- public static BlockletIndex getBlockletIndex(
- org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
+ /**
+ * convert external thrift BlockletMinMaxIndex to BlockletMinMaxIndex of carbon metadata
+ */
+ public static org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex
+ convertExternalMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
+ if (minMaxIndex == null) {
+ return null;
+ }
+
+ return new org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex(
+ minMaxIndex.getMin_values(), minMaxIndex.getMax_values());
+ }
+
+ /**
+ * convert BlockletMinMaxIndex of carbon metadata to external thrift BlockletMinMaxIndex
+ */
+ public static BlockletMinMaxIndex convertMinMaxIndex(
+ org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex minMaxIndex) {
+ if (minMaxIndex == null) {
+ return null;
+ }
+
BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
- for (int i = 0; i < info.getMinMaxIndex().getMaxValues().length; i++) {
- blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(info.getMinMaxIndex().getMaxValues()[i]));
- blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(info.getMinMaxIndex().getMinValues()[i]));
+ for (int i = 0; i < minMaxIndex.getMaxValues().length; i++) {
+ blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(minMaxIndex.getMaxValues()[i]));
+ blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(minMaxIndex.getMinValues()[i]));
}
+
+ return blockletMinMaxIndex;
+ }
+
+ public static BlockletIndex getBlockletIndex(
+ org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
+ BlockletMinMaxIndex blockletMinMaxIndex = convertMinMaxIndex(info.getMinMaxIndex());
BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
blockletBTreeIndex.setStart_key(info.getBtreeIndex().getStartKey());
blockletBTreeIndex.setEnd_key(info.getBtreeIndex().getEndKey());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ec201b9..4f85975 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.hadoop.api;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,7 +33,6 @@ import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
@@ -48,7 +46,6 @@ import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -56,9 +53,10 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.stream.StreamFile;
+import org.apache.carbondata.core.stream.StreamPruner;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.commons.logging.Log;
@@ -158,12 +156,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
streamSegments = segments.getStreamSegments();
streamSegments = getFilteredSegment(job, streamSegments, true, readCommittedScope);
if (validSegments.size() == 0) {
- return getSplitsOfStreaming(job, identifier, streamSegments);
+ return getSplitsOfStreaming(job, streamSegments, carbonTable);
}
List<Segment> filteredSegmentToAccess =
getFilteredSegment(job, segments.getValidSegments(), true, readCommittedScope);
if (filteredSegmentToAccess.size() == 0) {
- return getSplitsOfStreaming(job, identifier, streamSegments);
+ return getSplitsOfStreaming(job, streamSegments, carbonTable);
} else {
setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
}
@@ -173,7 +171,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
getSegmentsToAccess(job, readCommittedScope));
streamSegments = segments.getStreamSegments();
if (filteredNormalSegments.size() == 0) {
- return getSplitsOfStreaming(job, identifier, streamSegments);
+ return getSplitsOfStreaming(job, streamSegments, carbonTable);
}
setSegmentsToAccess(job.getConfiguration(),filteredNormalSegments);
}
@@ -231,7 +229,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
}
// add all splits of streaming
- List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, identifier, streamSegments);
+ List<InputSplit> splitsOfStreaming =
+ getSplitsOfStreaming(job, streamSegments, carbonTable, filterInterface);
if (!splitsOfStreaming.isEmpty()) {
splits.addAll(splitsOfStreaming);
}
@@ -339,64 +338,60 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
return filteredSegmentToAccess;
}
+ public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
+ CarbonTable carbonTable) throws IOException {
+ return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
+ }
+
/**
* use file list in .carbonindex file to get the split of streaming.
*/
- public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
- List<Segment> streamSegments) throws IOException {
+ public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
+ CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
if (streamSegments != null && !streamSegments.isEmpty()) {
numStreamSegments = streamSegments.size();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
- for (Segment segment : streamSegments) {
- String segmentDir =
- CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
- FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
- if (FileFactory.isFileExist(segmentDir, fileType)) {
- SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
- segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
- Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
- CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
- for (byte[] fileData : carbonIndexMap.values()) {
- indexReader.openThriftReader(fileData);
- try {
- // map block index
- while (indexReader.hasNext()) {
- BlockIndex blockIndex = indexReader.readBlockIndexInfo();
- String filePath = segmentDir + File.separator + blockIndex.getFile_name();
- Path path = new Path(filePath);
- long length = blockIndex.getFile_size();
- if (length != 0) {
- BlockLocation[] blkLocations;
- FileSystem fs = FileFactory.getFileSystem(path);
- FileStatus file = fs.getFileStatus(path);
- blkLocations = fs.getFileBlockLocations(path, 0, length);
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
- long bytesRemaining = length;
- while (((double) bytesRemaining) / splitSize > 1.1) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
- splitSize, blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
- bytesRemaining, blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
- }
- } else {
- //Create empty hosts array for zero length files
- splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0],
- FileFormat.ROW_V1));
- }
- }
- } finally {
- indexReader.closeThriftReader();
- }
+ if (filterResolverIntf == null) {
+ if (carbonTable != null) {
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ if (filter != null) {
+ carbonTable.processFilterExpression(filter, null, null);
+ filterResolverIntf = carbonTable.resolveFilter(filter);
+ }
+ }
+ }
+ StreamPruner streamPruner = new StreamPruner(carbonTable);
+ streamPruner.init(filterResolverIntf);
+ List<StreamFile> streamFiles = streamPruner.prune(streamSegments);
+
+ for (StreamFile streamFile : streamFiles) {
+ Path path = new Path(streamFile.getFilePath());
+ long length = streamFile.getFileSize();
+ if (length != 0) {
+ BlockLocation[] blkLocations;
+ FileSystem fs = FileFactory.getFileSystem(path);
+ FileStatus file = fs.getFileStatus(path);
+ blkLocations = fs.getFileBlockLocations(path, 0, length);
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+ long bytesRemaining = length;
+ // split the stream file to small splits
+ // there is 10% slop to avoid to generate very small split in the end
+ while (((double) bytesRemaining) / splitSize > 1.1) {
+ int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(
+ makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining,
+ splitSize, blkLocations[blkIndex].getHosts(),
+ blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
+ bytesRemaining -= splitSize;
+ }
+ if (bytesRemaining != 0) {
+ int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining,
+ bytesRemaining, blkLocations[blkIndex].getHosts(),
+ blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index b63e0dc..994cb3d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -205,8 +205,8 @@ class StreamHandoffRDD[K, V](
segmentList.add(Segment.toSegment(handOffSegmentId, null))
val splits = inputFormat.getSplitsOfStreaming(
job,
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
- segmentList
+ segmentList,
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
)
(0 until splits.size()).map { index =>
@@ -334,9 +334,9 @@ object StreamHandoffRDD {
} catch {
case ex: Exception =>
loadStatus = SegmentStatus.LOAD_FAILURE
+ LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
errorMessage = errorMessage + ": " + ex.getCause.getMessage
LOGGER.error(errorMessage)
- LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
}
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index ffaac86..196baa6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming
import java.util.Date
-import scala.collection.JavaConverters._
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -49,6 +47,7 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePost
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.StreamHandoffRDD
import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamOutputFormat}
+import org.apache.carbondata.streaming.index.StreamFileIndex
import org.apache.carbondata.streaming.parser.CarbonStreamParser
import org.apache.carbondata.streaming.segment.StreamSegment
@@ -236,7 +235,7 @@ object CarbonAppendableStreamSink {
// run write data file job
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
- var result: Array[TaskCommitMessage] = null
+ var result: Array[(TaskCommitMessage, StreamFileIndex)] = null
try {
committer.setupJob(job)
// initialize dictionary server
@@ -275,7 +274,8 @@ object CarbonAppendableStreamSink {
// update data file info in index file
StreamSegment.updateIndexFile(
- CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
+ CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId),
+ result.map(_._2))
} catch {
// catch fault of executor side
@@ -286,7 +286,7 @@ object CarbonAppendableStreamSink {
committer.abortJob(job)
throw new CarbonStreamException("Job failed to write data file", t)
}
- committer.commitJob(job, result)
+ committer.commitJob(job, result.map(_._1))
LOGGER.info(s"Job ${ job.getJobID } committed.")
}
}
@@ -302,8 +302,7 @@ object CarbonAppendableStreamSink {
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
- rowSchema: StructType
- ): TaskCommitMessage = {
+ rowSchema: StructType): (TaskCommitMessage, StreamFileIndex) = {
val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -325,6 +324,7 @@ object CarbonAppendableStreamSink {
committer.setupTask(taskAttemptContext)
try {
+ var blockIndex: StreamFileIndex = null
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
val parserName = taskAttemptContext.getConfiguration.get(
@@ -335,13 +335,13 @@ object CarbonAppendableStreamSink {
Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
- StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
+ blockIndex = StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
taskAttemptContext, carbonLoadModel)
})(catchBlock = {
committer.abortTask(taskAttemptContext)
LOGGER.error(s"Job $jobId aborted.")
})
- committer.commitTask(taskAttemptContext)
+ (committer.commitTask(taskAttemptContext), blockIndex)
} catch {
case t: Throwable =>
throw new CarbonStreamException("Task failed while writing rows", t)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 4dde81e..ab5539a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors
import scala.collection.mutable
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.exception.ProcessMetaDataException
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.streaming.parser.CarbonStreamParser
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -656,6 +658,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
test("query on stream table with dictionary, sort_columns") {
+ val batchParts =
+ partitionNums("select * from streaming.stream_table_filter")
+
executeStreamingIngest(
tableName = "stream_table_filter",
batchNums = 2,
@@ -668,6 +673,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
autoHandoff = false
)
+ val totalParts =
+ partitionNums("select * from streaming.stream_table_filter")
+ assert(totalParts > batchParts)
+
+ val streamParts = totalParts - batchParts
+
// non-filter
val result = sql("select * from streaming.stream_table_filter order by id, name").collect()
assert(result != null)
@@ -680,63 +691,79 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
assert(result(50).getString(1) == "batch_1")
// filter
+ assert(batchParts >= partitionNums("select * from stream_table_filter where id >= 100000001"))
+
checkAnswer(
sql("select * from stream_table_filter where id = 1"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id = 1"))
checkAnswer(
sql("select * from stream_table_filter where id > 49 and id < 100000002"),
Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where id > 49 and id < 100000002"))
checkAnswer(
sql("select * from stream_table_filter where id between 50 and 100000001"),
Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where id between 50 and 100000001"))
checkAnswer(
sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"))
checkAnswer(
sql("select * from stream_table_filter where name = 'name_3'"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where name = 'name_3'"))
checkAnswer(
sql("select * from stream_table_filter where name like '%me_3%' and id < 30"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where name like '%me_3%' and id < 30"))
checkAnswer(sql("select count(*) from stream_table_filter where name like '%ame%'"),
Seq(Row(49)))
+ assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%ame%'"))
checkAnswer(sql("select count(*) from stream_table_filter where name like '%batch%'"),
Seq(Row(5)))
+ assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%batch%'"))
checkAnswer(
sql("select * from stream_table_filter where name >= 'name_3' and id < 4"),
Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where name >= 'name_3' and id < 4"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"))
checkAnswer(
sql("select * from stream_table_filter where city = 'city_1'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where city = 'city_1'"))
checkAnswer(
sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"))
checkAnswer(sql("select count(*) from stream_table_filter where city like '%city%'"),
Seq(Row(54)))
+ assert(totalParts == partitionNums("select count(*) from stream_table_filter where city like '%city%'"))
checkAnswer(
sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"),
Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"))
checkAnswer(
sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"),
@@ -746,204 +773,252 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"))
checkAnswer(
sql("select * from stream_table_filter where salary = 90000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where salary = 90000"))
checkAnswer(
sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where salary > 80000 and salary <= 100000"))
checkAnswer(
sql("select * from stream_table_filter where salary between 80001 and 90000"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where salary between 80001 and 90000"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"))
checkAnswer(
sql("select * from stream_table_filter where tax = 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where tax = 0.04 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where tax >= 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where tax >= 0.04 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"))
checkAnswer(
sql("select * from stream_table_filter where percent = 80.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where percent = 80.04 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where percent >= 80.04 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where percent >= 80.04 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"))
checkAnswer(
sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"))
checkAnswer(
sql("select * from stream_table_filter where birthday = '1990-01-04'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where birthday = '1990-01-04'"))
checkAnswer(
sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"))
checkAnswer(
sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"))
checkAnswer(
sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where register = '2010-01-04 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")),
Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"),
Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"))
checkAnswer(
sql("select * from stream_table_filter where id is null order by name"),
Seq(Row(null, "", "", null, null, null, null, null, null),
Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts >= partitionNums("select * from stream_table_filter where id is null order by name"))
checkAnswer(
sql("select * from stream_table_filter where name = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where name = ''"))
checkAnswer(
sql("select * from stream_table_filter where id is null and name <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and name <> ''"))
checkAnswer(
sql("select * from stream_table_filter where city = ''"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(streamParts >= partitionNums("select * from stream_table_filter where city = ''"))
checkAnswer(
sql("select * from stream_table_filter where id is null and city <> ''"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and city <> ''"))
checkAnswer(
sql("select * from stream_table_filter where salary is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(totalParts == partitionNums("select * from stream_table_filter where salary is null"))
checkAnswer(
sql("select * from stream_table_filter where id is null and salary is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and salary is not null"))
checkAnswer(
sql("select * from stream_table_filter where tax is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(totalParts == partitionNums("select * from stream_table_filter where tax is null"))
checkAnswer(
sql("select * from stream_table_filter where id is null and tax is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and tax is not null"))
checkAnswer(
sql("select * from stream_table_filter where percent is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(totalParts == partitionNums("select * from stream_table_filter where percent is null"))
checkAnswer(
sql("select * from stream_table_filter where id is null and percent is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and percent is not null"))
checkAnswer(
sql("select * from stream_table_filter where birthday is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(1 == partitionNums("select * from stream_table_filter where birthday is null"))
checkAnswer(
sql("select * from stream_table_filter where id is null and birthday is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and birthday is not null"))
checkAnswer(
sql("select * from stream_table_filter where register is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(1 == partitionNums("select * from stream_table_filter where register is null"))
checkAnswer(
sql("select * from stream_table_filter where id is null and register is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and register is not null"))
checkAnswer(
sql("select * from stream_table_filter where updated is null"),
Seq(Row(null, "", "", null, null, null, null, null, null)))
+ assert(1 == partitionNums("select * from stream_table_filter where updated is null"))
checkAnswer(
sql("select * from stream_table_filter where id is null and updated is not null"),
Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+ assert(totalParts == partitionNums("select * from stream_table_filter where id is null and updated is not null"))
// agg
checkAnswer(
sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
"from stream_table_filter where id >= 2 and id <= 100000004"),
Seq(Row(51, 100000004, "batch_1", 7843162, 400001276)))
+ assert(totalParts >= partitionNums(
+ "select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+ "from stream_table_filter where id >= 2 and id <= 100000004"))
checkAnswer(
sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
@@ -956,6 +1031,14 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+ assert(totalParts >= partitionNums(
+ "select city, count(id), sum(id), cast(avg(id) as integer), " +
+ "max(salary), min(salary) " +
+ "from stream_table_filter " +
+ "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+ "and city <> '' " +
+ "group by city " +
+ "order by city"))
// batch loading
for(_ <- 0 to 2) {
@@ -2508,4 +2591,17 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
serverSocket
}
+ def findCarbonScanRDD(rdd: RDD[_]): RDD[_] = {
+ if (rdd.isInstanceOf[CarbonScanRDD[_]]) {
+ rdd
+ } else {
+ findCarbonScanRDD(rdd.dependencies(0).rdd)
+ }
+ }
+
+ def partitionNums(sqlString : String): Int = {
+ val rdd = findCarbonScanRDD(sql(sqlString).rdd)
+ rdd.partitions.length
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
index c4b501d..02e0ddf 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -52,6 +53,7 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.format.BlockletHeader;
@@ -413,7 +415,19 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
}
private boolean isScanRequired(BlockletHeader header) {
- // TODO require to implement min-max index
+ if (filter != null && header.getBlocklet_index() != null) {
+ BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil.convertExternalMinMaxIndex(
+ header.getBlocklet_index().getMin_max_index());
+ if (minMaxIndex != null) {
+ BitSet bitSet =
+ filter.isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues());
+ if (bitSet.isEmpty()) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index bd622f0..6325528 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -30,10 +30,12 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -51,12 +53,12 @@ import org.apache.carbondata.processing.loading.parser.RowParser;
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.streaming.segment.StreamSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
-
/**
* Stream record writer
*/
@@ -95,6 +97,10 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
private boolean isFirstRow = true;
private boolean hasException = false;
+ // batch level stats collector
+ private BlockletMinMaxIndex batchMinMaxIndex;
+ private boolean isClosed = false;
+
CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
initialize(job);
}
@@ -132,7 +138,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
}
private void initializeAtFirstRow() throws IOException, InterruptedException {
-
// initialize metadata
isNoDictionaryDimensionColumn =
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
@@ -144,20 +149,19 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
measureDataTypes[i] =
dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
}
-
// initialize parser and converter
rowParser = new RowParserImpl(dataFields, configuration);
badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
configuration.setCardinalityFinder(converter);
converter.initialize();
-
// initialize encoder
nullBitSet = new BitSet(dataFields.length);
int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
- output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
-
+ output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize,
+ isNoDictionaryDimensionColumn.length, measureCount,
+ measureDataTypes);
// initialize data writer
String filePath = segmentDir + File.separator + fileName;
FileFactory.FileType fileType = FileFactory.getFileType(filePath);
@@ -170,7 +174,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
outputStream = FileFactory.getDataOutputStream(filePath, fileType);
writeFileHeader();
}
-
isFirstRow = false;
}
@@ -178,7 +181,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
if (isFirstRow) {
initializeAtFirstRow();
}
-
// null bit set
nullBitSet.clear();
Object[] rowData = (Object[]) value;
@@ -203,7 +205,6 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
}
int dimCount = 0;
Object columnValue;
-
// primitive type dimension
for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
columnValue = currentRow.getObject(dimCount);
@@ -212,9 +213,13 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
byte[] col = (byte[]) columnValue;
output.writeShort(col.length);
output.writeBytes(col);
+ output.dimStatsCollectors[dimCount].update(col);
} else {
output.writeInt((int) columnValue);
+ output.dimStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue));
}
+ } else {
+ output.dimStatsCollectors[dimCount].updateNull(0);
}
}
// complex type dimension
@@ -234,19 +239,25 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
dataType = measureDataTypes[msrCount];
if (dataType == DataTypes.BOOLEAN) {
output.writeBoolean((boolean) columnValue);
+ output.msrStatsCollectors[msrCount].update((byte) ((boolean) columnValue ? 1 : 0));
} else if (dataType == DataTypes.SHORT) {
output.writeShort((short) columnValue);
+ output.msrStatsCollectors[msrCount].update((short) columnValue);
} else if (dataType == DataTypes.INT) {
output.writeInt((int) columnValue);
+ output.msrStatsCollectors[msrCount].update((int) columnValue);
} else if (dataType == DataTypes.LONG) {
output.writeLong((long) columnValue);
+ output.msrStatsCollectors[msrCount].update((long) columnValue);
} else if (dataType == DataTypes.DOUBLE) {
output.writeDouble((double) columnValue);
+ output.msrStatsCollectors[msrCount].update((double) columnValue);
} else if (DataTypes.isDecimal(dataType)) {
BigDecimal val = (BigDecimal) columnValue;
byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
output.writeShort(bigDecimalInBytes.length);
output.writeBytes(bigDecimalInBytes);
+ output.msrStatsCollectors[msrCount].update((BigDecimal) columnValue);
} else {
String msg =
"unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
@@ -254,6 +265,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
LOGGER.error(msg);
throw new IOException(msg);
}
+ } else {
+ output.msrStatsCollectors[msrCount].updateNull(0);
}
}
}
@@ -294,12 +307,27 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
}
output.apppendBlocklet(outputStream);
outputStream.flush();
+ if (!isClosed) {
+ batchMinMaxIndex = StreamSegment.mergeBlockletMinMax(
+ batchMinMaxIndex, output.generateBlockletMinMax(), measureDataTypes);
+ }
// reset data
output.reset();
}
- @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ public BlockletMinMaxIndex getBatchMinMaxIndex() {
+ return StreamSegment.mergeBlockletMinMax(
+ batchMinMaxIndex, output.generateBlockletMinMax(), measureDataTypes);
+ }
+
+ public DataType[] getMeasureDataTypes() {
+ return measureDataTypes;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
try {
+ isClosed = true;
// append remain buffer data
if (!hasException && !isFirstRow) {
appendBlockletToDataFile();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index 7b2176b..d4322b4 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -22,10 +22,20 @@ import java.io.IOException;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.BlockletIndex;
import org.apache.carbondata.format.BlockletInfo;
import org.apache.carbondata.format.MutationType;
+import org.apache.carbondata.streaming.segment.StreamSegment;
/**
* stream blocklet writer
@@ -39,13 +49,49 @@ public class StreamBlockletWriter {
private int rowIndex = -1;
private Compressor compressor = CompressorFactory.getInstance().getCompressor();
- StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
+ private int dimCountWithoutComplex;
+ private int measureCount;
+ private DataType[] measureDataTypes;
+
+ // blocklet level stats
+ ColumnPageStatsCollector[] dimStatsCollectors;
+ ColumnPageStatsCollector[] msrStatsCollectors;
+ // blocklet level Min/Max
+ private BlockletMinMaxIndex blockletMinMaxIndex;
+
+ StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize, int dimCountWithoutComplex,
+ int measureCount, DataType[] measureDataTypes) {
buffer = new byte[maxSize];
this.maxSize = maxSize;
this.maxRowNum = maxRowNum;
this.rowSize = rowSize;
+ this.dimCountWithoutComplex = dimCountWithoutComplex;
+ this.measureCount = measureCount;
+ this.measureDataTypes = measureDataTypes;
+ initializeStatsCollector();
+ }
+
+ private void initializeStatsCollector() {
+ // dimension stats collectors
+ // not require to collector stats for complex type
+ // so it only contains dictionary dimensions and no-dictionary dimensions
+ dimStatsCollectors = new ColumnPageStatsCollector[dimCountWithoutComplex];
+ // measure stats collectors
+ msrStatsCollectors = new ColumnPageStatsCollector[measureCount];
+
+ int dimCount = 0;
+ for (; dimCount < dimCountWithoutComplex; dimCount++) {
+ dimStatsCollectors[dimCount] =
+ KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
+ }
+
+ for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+ msrStatsCollectors[msrCount] =
+ PrimitivePageStatsCollector.newInstance(measureDataTypes[msrCount]);
+ }
}
+
private void ensureCapacity(int space) {
int newcount = space + count;
if (newcount > buffer.length) {
@@ -58,6 +104,8 @@ public class StreamBlockletWriter {
void reset() {
count = 0;
rowIndex = -1;
+ initializeStatsCollector();
+ blockletMinMaxIndex = null;
}
byte[] getBytes() {
@@ -134,6 +182,36 @@ public class StreamBlockletWriter {
count += len;
}
+ private SimpleStatsResult[] getDimStats() {
+ if (dimStatsCollectors == null) {
+ return new SimpleStatsResult[0];
+ }
+ SimpleStatsResult[] stats = new SimpleStatsResult[dimStatsCollectors.length];
+ int dimCount = 0;
+ for (; dimCount < dimStatsCollectors.length; dimCount++) {
+ stats[dimCount] = dimStatsCollectors[dimCount].getPageStats();
+ }
+ return stats;
+ }
+
+ private SimpleStatsResult[] getMsrStats() {
+ if (msrStatsCollectors == null) {
+ return new SimpleStatsResult[0];
+ }
+ SimpleStatsResult[] stats = new SimpleStatsResult[msrStatsCollectors.length];
+ for (int mrsCount = 0; mrsCount < msrStatsCollectors.length; mrsCount++) {
+ stats[mrsCount] = msrStatsCollectors[mrsCount].getPageStats();
+ }
+ return stats;
+ }
+
+ BlockletMinMaxIndex generateBlockletMinMax() {
+ if (blockletMinMaxIndex == null) {
+ blockletMinMaxIndex = StreamSegment.collectMinMaxIndex(getDimStats(), getMsrStats());
+ }
+ return blockletMinMaxIndex;
+ }
+
void apppendBlocklet(DataOutputStream outputStream) throws IOException {
outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
@@ -143,6 +221,13 @@ public class StreamBlockletWriter {
blockletHeader.setBlocklet_length(getCount());
blockletHeader.setMutation(MutationType.INSERT);
blockletHeader.setBlocklet_info(blockletInfo);
+ // add blocklet level min/max
+ blockletMinMaxIndex = generateBlockletMinMax();
+ if (blockletInfo.getNum_rows() > 1) {
+ BlockletIndex blockletIndex = new BlockletIndex();
+ blockletIndex.setMin_max_index(CarbonMetadataUtil.convertMinMaxIndex(blockletMinMaxIndex));
+ blockletHeader.setBlocklet_index(blockletIndex);
+ }
byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
outputStream.writeInt(headerBytes.length);
outputStream.write(headerBytes);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
new file mode 100644
index 0000000..fa8a694
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.index;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+@InterfaceAudience.Internal
+public class StreamFileIndex implements Serializable {
+
+ /**
+ * the name of file, it doesn't contain the whole path.
+ */
+ private String fileName;
+
+ private BlockletMinMaxIndex minMaxIndex;
+
+ private long rowCount;
+
+ private DataType[] msrDataTypes;
+
+ public StreamFileIndex(String fileName, BlockletMinMaxIndex minMaxIndex, long rowCount) {
+ this.fileName = fileName;
+ this.minMaxIndex = minMaxIndex;
+ this.rowCount = rowCount;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public BlockletMinMaxIndex getMinMaxIndex() {
+ return minMaxIndex;
+ }
+
+ public void setMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
+ this.minMaxIndex = minMaxIndex;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public DataType[] getMsrDataTypes() {
+ return msrDataTypes;
+ }
+
+ public void setMsrDataTypes(DataType[] msrDataTypes) {
+ this.msrDataTypes = msrDataTypes;
+ }
+}