You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2019/06/21 11:04:40 UTC

[carbondata] branch master updated: [CARBONDATA-3373] Fixed measure column filter perf

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ce717a  [CARBONDATA-3373] Fixed measure column filter perf
8ce717a is described below

commit 8ce717a281088f8b8729d81806fb8b54595b13b1
Author: manhua <ke...@qq.com>
AuthorDate: Wed Jun 19 11:15:30 2019 +0800

    [CARBONDATA-3373] Fixed measure column filter perf
    
    when sql with 'in numbers' and spark.sql.codegen.wholeStage is falseļ¼Œthe query is slow,
    the reason is that canbonscan row level filter's time complexity is O(n^2), we can replace list with hashset to improve query performance
    
    sql example: select * from xx where filed in (1,2,3,4,5,6)
    
    This closes #3295
---
 core/pom.xml                                       |   5 +
 .../core/scan/filter/FilterExecutorUtil.java       | 152 ++++++++++++++++++
 .../carbondata/core/scan/filter/FilterUtil.java    |   2 +-
 .../scan/filter/executer/BitSetUpdaterFactory.java |  61 ++++++++
 .../filter/executer/ExcludeFilterExecuterImpl.java |  34 ++--
 ...terFilterInfo.java => FilterBitSetUpdater.java} |  14 +-
 .../filter/executer/IncludeFilterExecuterImpl.java |  45 ++----
 .../executer/MeasureColumnExecuterFilterInfo.java  |  56 ++++++-
 .../spark/testsuite/filterexpr/TestInFilter.scala  | 173 +++++++++++++++++++++
 9 files changed, 474 insertions(+), 68 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 65cb686..41481af 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -109,6 +109,11 @@
       <version>0.5.11</version>
     </dependency>
     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>8.2.3</version>
+    </dependency>
+    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
       <version>4.0.42.Final</version>
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExecutorUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExecutorUtil.java
new file mode 100644
index 0000000..4000687
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExecutorUtil.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter;
+
+import java.util.AbstractCollection;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.filter.executer.FilterBitSetUpdater;
+import org.apache.carbondata.core.scan.filter.executer.MeasureColumnExecuterFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import it.unimi.dsi.fastutil.booleans.BooleanOpenHashSet;
+import it.unimi.dsi.fastutil.bytes.ByteOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.shorts.ShortOpenHashSet;
+
+/**
+ * Utility class for executing the filter
+ */
+public class FilterExecutorUtil {
+  /**
+   * Below method will be used to execute measure filter based on data type
+   * This is done to avoid conversion of primitive type to primitive object
+   * as it may cause lots of gc when number of record is high and will impact performance
+   *
+   * @param page
+   * @param bitSet
+   * @param measureColumnExecuterFilterInfo
+   * @param measureColumnResolvedFilterInfo
+   * @param filterBitSetUpdater
+   */
+  public static void executeIncludeExcludeFilterForMeasure(ColumnPage page, BitSet bitSet,
+      MeasureColumnExecuterFilterInfo measureColumnExecuterFilterInfo,
+      MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo,
+      FilterBitSetUpdater filterBitSetUpdater) {
+    final CarbonMeasure measure = measureColumnResolvedFilterInfo.getMeasure();
+    final DataType dataType = FilterUtil.getMeasureDataType(measureColumnResolvedFilterInfo);
+    int numberOfRows = page.getPageSize();
+    BitSet nullBitSet = page.getNullBits();
+    Object[] filterKeys = measureColumnExecuterFilterInfo.getFilterKeys();
+    // to handle the null value
+    for (int i = 0; i < filterKeys.length; i++) {
+      if (filterKeys[i] == null) {
+        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+          bitSet.flip(j);
+        }
+      }
+    }
+    AbstractCollection filterSet = measureColumnExecuterFilterInfo.getFilterSet();
+    if (dataType == DataTypes.BYTE) {
+      ByteOpenHashSet byteOpenHashSet = (ByteOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (byteOpenHashSet.contains((byte) page.getLong(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (dataType == DataTypes.BOOLEAN) {
+      BooleanOpenHashSet booleanOpenHashSet = (BooleanOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (booleanOpenHashSet.contains(page.getBoolean(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (dataType == DataTypes.SHORT) {
+      ShortOpenHashSet shortOpenHashSet = (ShortOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (shortOpenHashSet.contains((short) page.getLong(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (dataType == DataTypes.INT) {
+      IntOpenHashSet intOpenHashSet = (IntOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (intOpenHashSet.contains((int) page.getLong(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (dataType == DataTypes.FLOAT) {
+      FloatOpenHashSet floatOpenHashSet = (FloatOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (floatOpenHashSet.contains((float) page.getDouble(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (dataType == DataTypes.DOUBLE) {
+      DoubleOpenHashSet doubleOpenHashSet = (DoubleOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (doubleOpenHashSet.contains(page.getDouble(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (dataType == DataTypes.LONG) {
+      LongOpenHashSet longOpenHashSet = (LongOpenHashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          if (longOpenHashSet.contains(page.getLong(i))) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else if (DataTypes.isDecimal(dataType)) {
+      Set bigDecimalHashSet = (HashSet) filterSet;
+      for (int i = 0; i < numberOfRows; i++) {
+        if (!nullBitSet.get(i)) {
+          final Object measureObjectBasedOnDataType =
+              DataTypeUtil.getMeasureObjectBasedOnDataType(page, i, dataType, measure);
+          if (bigDecimalHashSet.contains(measureObjectBasedOnDataType)) {
+            filterBitSetUpdater.updateBitset(bitSet, i);
+          }
+        }
+      }
+    } else {
+      throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index cef3af1..547ed39 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1534,7 +1534,7 @@ public final class FilterUtil {
                   converter);
         }
       }
-      msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+      msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter,  measures.getDataType());
     } else {
       if (filterValues == null) {
         dimColumnExecuterInfo.setFilterKeys(new byte[0][]);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/BitSetUpdaterFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/BitSetUpdaterFactory.java
new file mode 100644
index 0000000..95bf09d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/BitSetUpdaterFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
+
+/**
+ * Class for updating the bitset
+ * If include it will set the bit
+ * If exclude it will flip the bit
+ */
+public final class BitSetUpdaterFactory {
+
+  public static final BitSetUpdaterFactory INSTANCE = new BitSetUpdaterFactory();
+
+  public FilterBitSetUpdater getBitSetUpdater(FilterExecuterType filterExecuterType) {
+    switch (filterExecuterType) {
+      case INCLUDE:
+        return new IncludeFilterBitSetUpdater();
+      case EXCLUDE:
+        return new ExcludeFilterBitSetUpdater();
+      default:
+        throw new UnsupportedOperationException(
+            "Invalid filter executor type:" + filterExecuterType);
+    }
+  }
+
+  /**
+   * Below class will be used to updating the bitset in case of include filter
+   */
+  static class IncludeFilterBitSetUpdater implements FilterBitSetUpdater {
+    @Override public void updateBitset(BitSet bitSet, int bitIndex) {
+      bitSet.set(bitIndex);
+    }
+  }
+
+  /**
+   * Below class will be used to updating the bitset in case of exclude filter
+   */
+  static class ExcludeFilterBitSetUpdater implements FilterBitSetUpdater {
+    @Override public void updateBitset(BitSet bitSet, int bitIndex) {
+      bitSet.flip(bitIndex);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index fc9fbae..36c510c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -26,7 +26,9 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterExecutorUtil;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
@@ -55,13 +57,19 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
 
   private byte[][] filterValues;
 
+  private FilterBitSetUpdater filterBitSetUpdater;
+
   public ExcludeFilterExecuterImpl(byte[][] filterValues, boolean isNaturalSorted) {
     this.filterValues = filterValues;
     this.isNaturalSorted = isNaturalSorted;
+    this.filterBitSetUpdater =
+        BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.EXCLUDE);
   }
   public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
       MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
       boolean isMeasure) {
+    this.filterBitSetUpdater =
+        BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.EXCLUDE);
     this.segmentProperties = segmentProperties;
     if (!isMeasure) {
       this.dimColEvaluatorInfo = dimColEvaluatorInfo;
@@ -188,30 +196,8 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     // the filter values. The one that matches sets it Bitset.
     BitSet bitSet = new BitSet(numerOfRows);
     bitSet.flip(0, numerOfRows);
-    Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
-    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
-    for (int i = 0; i < filterValues.length; i++) {
-      BitSet nullBitSet = columnPage.getNullBits();
-      if (filterValues[i] == null) {
-        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
-          bitSet.flip(j);
-        }
-        continue;
-      }
-      for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
-        if (!nullBitSet.get(startIndex)) {
-          // Check if filterValue[i] matches with measure Values.
-          Object msrValue = DataTypeUtil
-              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
-                  msrType, msrColumnEvaluatorInfo.getMeasure());
-
-          if (comparator.compare(msrValue, filterValues[i]) == 0) {
-            // This is a match.
-            bitSet.flip(startIndex);
-          }
-        }
-      }
-    }
+    FilterExecutorUtil.executeIncludeExcludeFilterForMeasure(columnPage,bitSet,
+        msrColumnExecutorInfo, msrColumnEvaluatorInfo, filterBitSetUpdater);
     return bitSet;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterBitSetUpdater.java
similarity index 79%
copy from core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
copy to core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterBitSetUpdater.java
index a19e617..39f0425 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FilterBitSetUpdater.java
@@ -16,15 +16,9 @@
  */
 package org.apache.carbondata.core.scan.filter.executer;
 
-public class MeasureColumnExecuterFilterInfo {
+import java.util.BitSet;
 
-  Object[] filterKeys;
-
-  public void setFilterKeys(Object[] filterKeys) {
-    this.filterKeys = filterKeys;
-  }
-
-  public Object[] getFilterKeys() {
-    return filterKeys;
-  }
+public interface FilterBitSetUpdater {
+  void updateBitset(BitSet bitSet, int bitIndex);
 }
+
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 64dc3a1..1231aa0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -28,7 +28,9 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterExecutorUtil;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
@@ -57,15 +59,20 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
 
   private byte[][] filterValues;
 
+  private FilterBitSetUpdater filterBitSetUpdater;
+
   public IncludeFilterExecuterImpl(byte[][] filterValues, boolean isNaturalSorted) {
     this.filterValues = filterValues;
     this.isNaturalSorted = isNaturalSorted;
+    this.filterBitSetUpdater =
+        BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.INCLUDE);
   }
 
   public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
       MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
       boolean isMeasure) {
-
+    this.filterBitSetUpdater =
+        BitSetUpdaterFactory.INSTANCE.getBitSetUpdater(FilterExecuterType.INCLUDE);
     this.segmentProperties = segmentProperties;
     if (!isMeasure) {
       this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
@@ -272,31 +279,8 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     // Get the measure values from the chunk. compare sequentially with the
     // the filter values. The one that matches sets it Bitset.
     BitSet bitSet = new BitSet(rowsInPage);
-    Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
-
-    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = columnPage.getNullBits();
-    for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i] == null) {
-        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
-          bitSet.set(j);
-        }
-        continue;
-      }
-      for (int startIndex = 0; startIndex < rowsInPage; startIndex++) {
-        if (!nullBitSet.get(startIndex)) {
-          // Check if filterValue[i] matches with measure Values.
-          Object msrValue = DataTypeUtil
-              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
-                  msrType, msrColumnEvaluatorInfo.getMeasure());
-
-          if (comparator.compare(msrValue, filterValues[i]) == 0) {
-            // This is a match.
-            bitSet.set(startIndex);
-          }
-        }
-      }
-    }
+    FilterExecutorUtil.executeIncludeExcludeFilterForMeasure(columnPage,bitSet,
+        msrColumnExecutorInfo, msrColumnEvaluatorInfo, filterBitSetUpdater);
     return bitSet;
   }
 
@@ -525,12 +509,9 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       }
     } else if (isMeasurePresentInCurrentBlock) {
       chunkIndex = msrColumnEvaluatorInfo.getColumnIndexInMinMaxByteArray();
-      if (isMinMaxSet[chunkIndex]) {
-        isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
-            msrColumnExecutorInfo.getFilterKeys(), msrColumnEvaluatorInfo.getType());
-      } else {
-        isScanRequired = true;
-      }
+      isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
+          msrColumnExecutorInfo.getFilterKeys(),
+          msrColumnEvaluatorInfo.getType());
     }
 
     if (isScanRequired) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
index a19e617..5b5b298 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
@@ -16,15 +16,69 @@
  */
 package org.apache.carbondata.core.scan.filter.executer;
 
+import java.util.AbstractCollection;
+import java.util.HashSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import it.unimi.dsi.fastutil.booleans.BooleanOpenHashSet;
+import it.unimi.dsi.fastutil.bytes.ByteOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.shorts.ShortOpenHashSet;
+
+/**
+ * Below class will be used to keep all the filter values based on data type
+ * for measure column.
+ * In this class there are multiple type of set is used to avoid conversion of
+ * primitive type to primitive object to avoid gc which cause perofrmace degrade when
+ * number of records are high
+ */
 public class MeasureColumnExecuterFilterInfo {
 
   Object[] filterKeys;
 
-  public void setFilterKeys(Object[] filterKeys) {
+  /**
+   * filter set used for filtering the measure value based on data type
+   */
+  private AbstractCollection filterSet;
+
+  public void setFilterKeys(Object[] filterKeys, DataType dataType) {
     this.filterKeys = filterKeys;
+    if (dataType == DataTypes.BOOLEAN) {
+      filterSet = new BooleanOpenHashSet();
+    } else if (dataType == DataTypes.BYTE) {
+      filterSet = new ByteOpenHashSet();
+    } else if (dataType == DataTypes.SHORT) {
+      filterSet = new ShortOpenHashSet();
+    } else if (dataType == DataTypes.INT) {
+      filterSet = new IntOpenHashSet();
+    } else if (dataType == DataTypes.FLOAT) {
+      filterSet = new FloatOpenHashSet();
+    } else if (dataType == DataTypes.LONG) {
+      filterSet = new LongOpenHashSet();
+    } else if (dataType == DataTypes.DOUBLE) {
+      filterSet = new DoubleOpenHashSet();
+    } else if (DataTypes.isDecimal(dataType)) {
+      filterSet = new HashSet();
+    } else {
+      throw new IllegalArgumentException("Invalid data type");
+    }
+    for (int i = 0; i < filterKeys.length; i++) {
+      if (null != filterKeys[i]) {
+        filterSet.add(filterKeys[i]);
+      }
+    }
   }
 
   public Object[] getFilterKeys() {
     return filterKeys;
   }
+
+  public AbstractCollection getFilterSet() {
+    return filterSet;
+  }
 }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestInFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestInFilter.scala
new file mode 100644
index 0000000..dd2b899
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestInFilter.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.filterexpr
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestInFilter extends QueryTest with BeforeAndAfterAll{
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists test_table")
+    sql("create table test_table(intField INT, floatField FLOAT, doubleField DOUBLE, " +
+        "decimalField DECIMAL(18,2))  stored by 'carbondata'")
+
+    // turn on  row level filter in carbon
+    // because only row level is on, 'in' will be pushdowned into CarbonScanRDD
+    //  or in filter will be handled by spark.
+    sql("set carbon.push.rowfilters.for.vector=true")
+    sql("insert into test_table values(8,8,8,8),(5,5.0,5.0,5.0),(4,1.00,2.00,3.00)," +
+        "(6,6.0000,6.0000,6.0000),(4743,4743.00,4743.0000,4743.0),(null,null,null,null)")
+  }
+
+  test("sql with in different measurement type") {
+    // the precision of filter value is less one digit than column value
+    // float type test
+    checkAnswer(
+      sql("select * from test_table where floatField in(1.0)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where floatField in(4743.0)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where floatField in(5)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where floatField in(6.000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // double type test
+    checkAnswer(
+      sql("select * from test_table where doubleField in(2.0)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(4743.000)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(5)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(6.000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // decimalField type test
+    checkAnswer(
+      sql("select * from test_table where decimalField in(3.0)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where decimalField in(4743)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where decimalField in(5)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where decimalField in(6.000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // the precision of filter value is more one digit than column value
+    // int type test
+    checkAnswer(
+      sql("select * from test_table where intField in(4.0)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where intField in(4743.0)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where intField in(5.0)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where intField in(6.0)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // float type test
+    checkAnswer(
+      sql("select * from test_table where floatField in(1.000)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where floatField in(4743.000)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where floatField in(5.00)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where floatField in(6.00000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // double type test
+    checkAnswer(
+      sql("select * from test_table where doubleField in(2.000)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(4743.00000)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(5.00)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(6.00000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // decimalField type test
+    checkAnswer(
+      sql("select * from test_table where decimalField in(3.000)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+    checkAnswer(
+      sql("select * from test_table where decimalField in(4743.00)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where decimalField in(5.00)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where decimalField in(6.00000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+
+    // case: filter value is null
+    checkAnswer(
+      sql("select * from test_table where decimalField is null"),
+      Seq(Row(null, null, null, null)))
+
+    // filter value and column 's precision are the same
+    checkAnswer(
+      sql("select * from test_table where doubleField in(5.0) " +
+          "and floatField in(5.0) and decimalField in(5.0) and intField in(5)"),
+      Seq(Row(5, 5.0, 5.0, 5.0)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(6.0000) " +
+          "and floatField in(6.0000) and decimalField in(6.0000) and intField in(6.0000)"),
+      Seq(Row(6, 6.0000, 6.0000, 6.0000)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(8) " +
+          "and floatField in(8) and decimalField in(8) and intField in(8)"),
+      Seq(Row(8, 8, 8, 8)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(4743.0000) " +
+          "and floatField in(4743.00) and decimalField in(4743.0) and intField in(4743)"),
+      Seq(Row(4743, 4743.00, 4743.0000, 4743.0)))
+    checkAnswer(
+      sql("select * from test_table where doubleField in(2.00) " +
+          "and floatField in(1.00) and decimalField in(3.00) and intField in(4)"),
+      Seq(Row(4, 1.00, 2.00, 3.00)))
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists test_table")
+    sql("set carbon.push.rowfilters.for.vector=false")
+  }
+
+}