You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2019/06/21 11:04:40 UTC
[carbondata] branch master updated: [CARBONDATA-3373] Fixed measure
column filter perf
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8ce717a [CARBONDATA-3373] Fixed measure column filter perf
8ce717a is described below
commit 8ce717a281088f8b8729d81806fb8b54595b13b1
Author: manhua <ke...@qq.com>
AuthorDate: Wed Jun 19 11:15:30 2019 +0800
[CARBONDATA-3373] Fixed measure column filter perf
when sql with 'in numbers' and spark.sql.codegen.wholeStage is falseļ¼the query is slow,
the reason is that canbonscan row level filter's time complexity is O(n^2), we can replace list with hashset to improve query performance
sql example: select * from xx where filed in (1,2,3,4,5,6)
This closes #3295
---
core/pom.xml | 5 +
.../core/scan/filter/FilterExecutorUtil.java | 152 ++++++++++++++++++
.../carbondata/core/scan/filter/FilterUtil.java | 2 +-
.../scan/filter/executer/BitSetUpdaterFactory.java | 61 ++++++++
.../filter/executer/ExcludeFilterExecuterImpl.java | 34 ++--
...terFilterInfo.java => FilterBitSetUpdater.java} | 14 +-
.../filter/executer/IncludeFilterExecuterImpl.java | 45 ++----
.../executer/MeasureColumnExecuterFilterInfo.java | 56 ++++++-
.../spark/testsuite/filterexpr/TestInFilter.scala | 173 +++++++++++++++++++++
9 files changed, 474 insertions(+), 68 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 65cb686..41481af 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -109,6 +109,11 @@
<version>0.5.11</version>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>8.2.3</version>
+ </dependency>
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.42.Final</version>
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExecutorUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExecutorUtil.java
new file mode 100644
index 0000000..4000687
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExecutorUtil.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter;
+
+import java.util.AbstractCollection;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.filter.executer.FilterBitSetUpdater;
+import org.apache.carbondata.core.scan.filter.executer.MeasureColumnExecuterFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import it.unimi.dsi.fastutil.booleans.BooleanOpenHashSet;
+import it.unimi.dsi.fastutil.bytes.ByteOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.shorts.ShortOpenHashSet;
+
+/**
+ * Utility class for executing the filter
+ */
+public class FilterExecutorUtil {
+ /**
+ * Below method will be used to execute measure filter based on data type
+ * This is done to avoid conversion of primitive type to primitive object
+ * as it may cause lots of gc when number of record is high and will impact performance
+ *
+ * @param page
+ * @param bitSet
+ * @param measureColumnExecuterFilterInfo
+ * @param measureColumnResolvedFilterInfo
+ * @param filterBitSetUpdater
+ */
+ public static void executeIncludeExcludeFilterForMeasure(ColumnPage page, BitSet bitSet,
+ MeasureColumnExecuterFilterInfo measureColumnExecuterFilterInfo,
+ MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo,
+ FilterBitSetUpdater filterBitSetUpdater) {
+ final CarbonMeasure measure = measureColumnResolvedFilterInfo.getMeasure();
+ final DataType dataType = FilterUtil.getMeasureDataType(measureColumnResolvedFilterInfo);
+ int numberOfRows = page.getPageSize();
+ BitSet nullBitSet = page.getNullBits();
+ Object[] filterKeys = measureColumnExecuterFilterInfo.getFilterKeys();
+ // to handle the null value
+ for (int i = 0; i < filterKeys.length; i++) {
+ if (filterKeys[i] == null) {
+ for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+ bitSet.flip(j);
+ }
+ }
+ }
+ AbstractCollection filterSet = measureColumnExecuterFilterInfo.getFilterSet();
+ if (dataType == DataTypes.BYTE) {
+ ByteOpenHashSet byteOpenHashSet = (ByteOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (byteOpenHashSet.contains((byte) page.getLong(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (dataType == DataTypes.BOOLEAN) {
+ BooleanOpenHashSet booleanOpenHashSet = (BooleanOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (booleanOpenHashSet.contains(page.getBoolean(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (dataType == DataTypes.SHORT) {
+ ShortOpenHashSet shortOpenHashSet = (ShortOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (shortOpenHashSet.contains((short) page.getLong(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (dataType == DataTypes.INT) {
+ IntOpenHashSet intOpenHashSet = (IntOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (intOpenHashSet.contains((int) page.getLong(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (dataType == DataTypes.FLOAT) {
+ FloatOpenHashSet floatOpenHashSet = (FloatOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (floatOpenHashSet.contains((float) page.getDouble(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (dataType == DataTypes.DOUBLE) {
+ DoubleOpenHashSet doubleOpenHashSet = (DoubleOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (doubleOpenHashSet.contains(page.getDouble(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (dataType == DataTypes.LONG) {
+ LongOpenHashSet longOpenHashSet = (LongOpenHashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ if (longOpenHashSet.contains(page.getLong(i))) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else if (DataTypes.isDecimal(dataType)) {
+ Set bigDecimalHashSet = (HashSet) filterSet;
+ for (int i = 0; i < numberOfRows; i++) {
+ if (!nullBitSet.get(i)) {
+ final Object measureObjectBasedOnDataType =
+ DataTypeUtil.getMeasureObjectBasedOnDataType(page, i, dataType, measure);
+ if (bigDecimalHashSet.contains(measureObjectBasedOnDataType)) {
+ filterBitSetUpdater.updateBitset(bitSet, i);
+ }
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index cef3af1..547ed39 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1534,7 +1534,7 @@ public final class FilterUtil {
converter);
}
}
- msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+ msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter, measures.getDataType());
} else {
if (filterValues == null) {
dimColumnExecuterInfo.setFilterKeys(new byte[0][]);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/BitSetUpdaterFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/BitSetUpdaterFactory.java
new file mode 100644
index 0000000..95bf09d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/BitSetUpdaterFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
+
+/**
+ * Class for updating the bitset
+ * If include it will set the bit
+ * If exclude it will flip the bit
+ */
+public final class BitSetUpdaterFactory {
+
+ public static final BitSetUpdaterFactory INSTANCE = new BitSetUpdaterFactory();
+
+ public FilterBitSetUpdater getBitSetUpdater(FilterExecuterType filterExecuterType) {
+ switch (filterExecuterType) {
+ case INCLUDE:
+ return new IncludeFilterBitSetUpdater();
+ case EXCLUDE:
+ return new ExcludeFilterBitSetUpdater();
+ default:
+ throw new UnsupportedOperationException(
+ "Invalid filter executor type:" + filterExecuterType);
+ }
+ }
+
+ /**
+ * Below class will be used to updating the bitset in case of include filter
+ */
+ static class IncludeFilterBitSetUpdater implements FilterBitSetUpdater {
+ @Override public void updateBitset(BitSet bitSet, int bitIndex) {
+ bitSet.set(bitIndex);
+ }
+ }
+
+ /**
+ * Below class will be used to updating the bitset in case of exclude filter
+ */
+ static class ExcludeFilterBitSetUpdater implements FilterBitSetUpdater {
+ @Override public void updateBitset(BitSet bitSet, int bitIndex) {
+ bitSet.flip(bitIndex);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index fc9fbae..36c510c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -26,7 +26,9 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterExecutorUtil;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
@@ -55,13 +57,19 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
private byte[][] filterValues;
+ private FilterBitSetUpdater filterBitSetUpdater;
+
public ExcludeFilterExecuterImpl(byte[][] filterValues, boolean isNaturalSorted) {
this.filterValues = filterValues;
this.isNaturalSorted = isNaturalSorted;
+ this.filterBitSetUpdater =
+ BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.EXCLUDE);
}
public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
boolean isMeasure) {
+ this.filterBitSetUpdater =
+ BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.EXCLUDE);
this.segmentProperties = segmentProperties;
if (!isMeasure) {
this.dimColEvaluatorInfo = dimColEvaluatorInfo;
@@ -188,30 +196,8 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
// the filter values. The one that matches sets it Bitset.
BitSet bitSet = new BitSet(numerOfRows);
bitSet.flip(0, numerOfRows);
- Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
- SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
- for (int i = 0; i < filterValues.length; i++) {
- BitSet nullBitSet = columnPage.getNullBits();
- if (filterValues[i] == null) {
- for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
- bitSet.flip(j);
- }
- continue;
- }
- for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
- if (!nullBitSet.get(startIndex)) {
- // Check if filterValue[i] matches with measure Values.
- Object msrValue = DataTypeUtil
- .getMeasureObjectBasedOnDataType(columnPage, startIndex,
- msrType, msrColumnEvaluatorInfo.getMeasure());
-
- if (comparator.compare(msrValue, filterValues[i]) == 0) {
- // This is a match.
- bitSet.flip(startIndex);
- }
- }
- }
- }
+ FilterExecutorUtil.executeIncludeExcludeFilterForMeasure(columnPage,bitSet,
+ msrColumnExecutorInfo, msrColumnEvaluatorInfo, filterBitSetUpdater);
return bitSet;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterBitSetUpdater.java
similarity index 79%
copy from core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
copy to core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterBitSetUpdater.java
index a19e617..39f0425 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterBitSetUpdater.java
@@ -16,15 +16,9 @@
*/
package org.apache.carbondata.core.scan.filter.executer;
-public class MeasureColumnExecuterFilterInfo {
+import java.util.BitSet;
- Object[] filterKeys;
-
- public void setFilterKeys(Object[] filterKeys) {
- this.filterKeys = filterKeys;
- }
-
- public Object[] getFilterKeys() {
- return filterKeys;
- }
+public interface FilterBitSetUpdater {
+ void updateBitset(BitSet bitSet, int bitIndex);
}
+
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 64dc3a1..1231aa0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -28,7 +28,9 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterExecutorUtil;
import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
@@ -57,15 +59,20 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
private byte[][] filterValues;
+ private FilterBitSetUpdater filterBitSetUpdater;
+
public IncludeFilterExecuterImpl(byte[][] filterValues, boolean isNaturalSorted) {
this.filterValues = filterValues;
this.isNaturalSorted = isNaturalSorted;
+ this.filterBitSetUpdater =
+ BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.INCLUDE);
}
public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
boolean isMeasure) {
-
+ this.filterBitSetUpdater =
+ BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.INCLUDE);
this.segmentProperties = segmentProperties;
if (!isMeasure) {
this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
@@ -272,31 +279,8 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
// Get the measure values from the chunk. compare sequentially with the
// the filter values. The one that matches sets it Bitset.
BitSet bitSet = new BitSet(rowsInPage);
- Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
-
- SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
- BitSet nullBitSet = columnPage.getNullBits();
- for (int i = 0; i < filterValues.length; i++) {
- if (filterValues[i] == null) {
- for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
- bitSet.set(j);
- }
- continue;
- }
- for (int startIndex = 0; startIndex < rowsInPage; startIndex++) {
- if (!nullBitSet.get(startIndex)) {
- // Check if filterValue[i] matches with measure Values.
- Object msrValue = DataTypeUtil
- .getMeasureObjectBasedOnDataType(columnPage, startIndex,
- msrType, msrColumnEvaluatorInfo.getMeasure());
-
- if (comparator.compare(msrValue, filterValues[i]) == 0) {
- // This is a match.
- bitSet.set(startIndex);
- }
- }
- }
- }
+ FilterExecutorUtil.executeIncludeExcludeFilterForMeasure(columnPage,bitSet,
+ msrColumnExecutorInfo, msrColumnEvaluatorInfo, filterBitSetUpdater);
return bitSet;
}
@@ -525,12 +509,9 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
}
} else if (isMeasurePresentInCurrentBlock) {
chunkIndex = msrColumnEvaluatorInfo.getColumnIndexInMinMaxByteArray();
- if (isMinMaxSet[chunkIndex]) {
- isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
- msrColumnExecutorInfo.getFilterKeys(), msrColumnEvaluatorInfo.getType());
- } else {
- isScanRequired = true;
- }
+ isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
+ msrColumnExecutorInfo.getFilterKeys(),
+ msrColumnEvaluatorInfo.getType());
}
if (isScanRequired) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
index a19e617..5b5b298 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
@@ -16,15 +16,69 @@
*/
package org.apache.carbondata.core.scan.filter.executer;
+import java.util.AbstractCollection;
+import java.util.HashSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import it.unimi.dsi.fastutil.booleans.BooleanOpenHashSet;
+import it.unimi.dsi.fastutil.bytes.ByteOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.shorts.ShortOpenHashSet;
+
+/**
+ * Below class will be used to keep all the filter values based on data type
+ * for measure column.
+ * In this class there are multiple type of set is used to avoid conversion of
+ * primitive type to primitive object to avoid gc which cause perofrmace degrade when
+ * number of records are high
+ */
public class MeasureColumnExecuterFilterInfo {
Object[] filterKeys;
- public void setFilterKeys(Object[] filterKeys) {
+ /**
+ * filter set used for filtering the measure value based on data type
+ */
+ private AbstractCollection filterSet;
+
+ public void setFilterKeys(Object[] filterKeys, DataType dataType) {
this.filterKeys = filterKeys;
+ if (dataType == DataTypes.BOOLEAN) {
+ filterSet = new BooleanOpenHashSet();
+ } else if (dataType == DataTypes.BYTE) {
+ filterSet = new ByteOpenHashSet();
+ } else if (dataType == DataTypes.SHORT) {
+ filterSet = new ShortOpenHashSet();
+ } else if (dataType == DataTypes.INT) {
+ filterSet = new IntOpenHashSet();
+ } else if (dataType == DataTypes.FLOAT) {
+ filterSet = new FloatOpenHashSet();
+ } else if (dataType == DataTypes.LONG) {
+ filterSet = new LongOpenHashSet();
+ } else if (dataType == DataTypes.DOUBLE) {
+ filterSet = new DoubleOpenHashSet();
+ } else if (DataTypes.isDecimal(dataType)) {
+ filterSet = new HashSet();
+ } else {
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ for (int i = 0; i < filterKeys.length; i++) {
+ if (null != filterKeys[i]) {
+ filterSet.add(filterKeys[i]);
+ }
+ }
}
public Object[] getFilterKeys() {
return filterKeys;
}
+
+ public AbstractCollection getFilterSet() {
+ return filterSet;
+ }
}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestInFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestInFilter.scala
new file mode 100644
index 0000000..dd2b899
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestInFilter.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.filterexpr
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestInFilter extends QueryTest with BeforeAndAfterAll{
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists test_table")
+ sql("create table test_table(intField INT, floatField FLOAT, doubleField DOUBLE, " +
+ "decimalField DECIMAL(18,2)) stored by 'carbondata'")
+
+ // turn on row level filter in carbon
+ // because only row level is on, 'in' will be pushdowned into CarbonScanRDD
+ // or in filter will be handled by spark.
+ sql("set carbon.push.rowfilters.for.vector=true")
+ sql("insert into test_table values(8,8,8,8),(5,5.0,5.0,5.0),(4,1.00,2.00,3.00)," +
+ "(6,6.0000,6.0000,6.0000),(4743,4743.00,4743.0000,4743.0),(null,null,null,null)")
+ }
+
+ test("sql with in different measurement type") {
+ // the precision of filter value is less one digit than column value
+ // float type test
+ checkAnswer(
+ sql("select * from test_table where floatField in(1.0)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where floatField in(4743.0)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where floatField in(5)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where floatField in(6.000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // double type test
+ checkAnswer(
+ sql("select * from test_table where doubleField in(2.0)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(4743.000)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(5)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(6.000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // decimalField type test
+ checkAnswer(
+ sql("select * from test_table where decimalField in(3.0)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where decimalField in(4743)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where decimalField in(5)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where decimalField in(6.000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // the precision of filter value is more one digit than column value
+ // int type test
+ checkAnswer(
+ sql("select * from test_table where intField in(4.0)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where intField in(4743.0)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where intField in(5.0)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where intField in(6.0)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // float type test
+ checkAnswer(
+ sql("select * from test_table where floatField in(1.000)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where floatField in(4743.000)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where floatField in(5.00)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where floatField in(6.00000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // double type test
+ checkAnswer(
+ sql("select * from test_table where doubleField in(2.000)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(4743.00000)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(5.00)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(6.00000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // decimalField type test
+ checkAnswer(
+ sql("select * from test_table where decimalField in(3.000)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ checkAnswer(
+ sql("select * from test_table where decimalField in(4743.00)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where decimalField in(5.00)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where decimalField in(6.00000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+ // case: filter value is null
+ checkAnswer(
+ sql("select * from test_table where decimalField is null"),
+ Seq(Row(null, null, null, null)))
+
+ // filter value and column 's precision are the same
+ checkAnswer(
+ sql("select * from test_table where doubleField in(5.0) " +
+ "and floatField in(5.0) and decimalField in(5.0) and intField in(5)"),
+ Seq(Row(5, 5.0, 5.0, 5.0)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(6.0000) " +
+ "and floatField in(6.0000) and decimalField in(6.0000) and intField in(6.0000)"),
+ Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(8) " +
+ "and floatField in(8) and decimalField in(8) and intField in(8)"),
+ Seq(Row(8, 8, 8, 8)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(4743.0000) " +
+ "and floatField in(4743.00) and decimalField in(4743.0) and intField in(4743)"),
+ Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+ checkAnswer(
+ sql("select * from test_table where doubleField in(2.00) " +
+ "and floatField in(1.00) and decimalField in(3.00) and intField in(4)"),
+ Seq(Row(4, 1.00, 2.00, 3.00)))
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop table if exists test_table")
+ sql("set carbon.push.rowfilters.for.vector=false")
+ }
+
+}