You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/09/10 15:03:56 UTC

[carbondata] branch master updated: [CARBONDATA-3961] Reorder filter expression based on storage ordinal

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b57d17b  [CARBONDATA-3961] Reorder filter expression based on storage ordinal
b57d17b is described below

commit b57d17be8d48781cf590cb301aff3062a2181bcb
Author: kunal642 <ku...@gmail.com>
AuthorDate: Wed Aug 26 13:43:47 2020 +0530

    [CARBONDATA-3961] Reorder filter expression based on storage ordinal
    
    Why is this PR needed?
    Currently the filter is being executed in the user specified order, which may be different than the storage order. This can cause a lot of backward seek in cloud storage solutions resulting in slower performance.
    
    What changes were proposed in this PR?
    Reorder the filter according to the column storage ordinal.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3902
---
 .../core/constants/CarbonCommonConstants.java      |   6 +
 .../carbondata/core/util/CarbonProperties.java     |   9 +-
 .../apache/carbondata/core/util/SessionParams.java |   2 +
 docs/configuration-parameters.md                   |   2 +-
 .../org/apache/carbondata/geo/InPolygonUDF.scala   |   2 +-
 .../org/apache/carbondata/index/TextMatchUDF.scala |   4 +-
 .../apache/spark/sql/CarbonBoundReference.scala    |  14 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |   5 +-
 .../execution/command/CarbonHiveCommands.scala     |  11 +-
 .../apache/spark/sql/optimizer/CarbonFilters.scala | 150 ++++++++++++++++++++-
 .../carbondata/query/TestFilterReordering.scala    |  75 +++++++++++
 11 files changed, 262 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ca8e171..c498c20 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2557,4 +2557,10 @@ public final class CarbonCommonConstants {
   public static final String COMPLEX_DELIMITER_LEVEL_3_DEFAULT = "@";
 
   public static final String FILE_HEADER = "fileHeader";
+
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_REORDER_FILTER = "carbon.reorder.filter";
+
+  public static final String CARBON_REORDER_FILTER_DEFAULT = "true";
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index d0077ec..462e459 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2105,7 +2105,7 @@ public final class CarbonProperties {
     // Check if user has enabled/disabled the use of property for the current db and table using
     // the set command
     String thresholdValue = getSessionPropertyValue(
-            CarbonCommonConstants.CARBON_LOAD_SI_REPAIR + "." + dbName + "." + tableName);
+        CarbonCommonConstants.CARBON_LOAD_SI_REPAIR + "." + dbName + "." + tableName);
     if (thresholdValue == null) {
       // if not set in session properties then check carbon.properties for the same.
       thresholdValue = getProperty(CarbonCommonConstants.CARBON_SI_REPAIR_LIMIT);
@@ -2115,4 +2115,11 @@ public final class CarbonProperties {
     }
     return Math.abs(Integer.parseInt(thresholdValue));
   }
+
+  public static boolean isFilterReorderingEnabled() {
+    return Boolean.parseBoolean(
+        getInstance().getProperty(CarbonCommonConstants.CARBON_REORDER_FILTER,
+        CarbonCommonConstants.CARBON_REORDER_FILTER_DEFAULT)
+    );
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 233dbe0..0fd4e82 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -227,6 +227,8 @@ public class SessionParams implements Serializable, Cloneable {
             throw new InvalidConfigurationException("The sort scope " + key
                 + " can have only either NO_SORT, LOCAL_SORT or GLOBAL_SORT.");
           }
+        } else if (key.equalsIgnoreCase(CARBON_REORDER_FILTER)) {
+          isValid = true;
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 634d607..38165fd 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -230,7 +230,7 @@ RESET
 | carbon.index.visible.<db_name>.<table_name>.<index_name> | To specify query on ***db_name.table_name*** to not use the index ***index_name***. |
 | carbon.load.indexes.parallel.<db_name>.<table_name> | To enable parallel index loading for a table. when db_name.table_name are not specified, i.e., when ***carbon.load.indexes.parallel.*** is set, it applies for all the tables of the session. |
 | carbon.enable.index.server                | To use index server for caching and pruning. This property can be used for a session or for a particular table with ***carbon.enable.index.server.<db_name>.<table_name>***. |
-
+| carbon.reorder.filter                     | This property can be used to enabled/disable filter reordering. Should be disabled only when the user has optimized the filter condition.
 **Examples:**
 
 * Add or Update:
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
index 1f419ed..b73bf04 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
@@ -30,6 +30,6 @@ class InPolygonUDF extends (String => Boolean) with Serializable {
 
 @InterfaceAudience.Internal
 case class InPolygon(queryString: String) extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = Array()
 }
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/index/TextMatchUDF.scala b/integration/spark/src/main/scala/org/apache/carbondata/index/TextMatchUDF.scala
index 0435579..590fa7a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/index/TextMatchUDF.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/index/TextMatchUDF.scala
@@ -37,10 +37,10 @@ class TextMatchMaxDocUDF extends ((String, Int) => Boolean) with Serializable {
 
 @InterfaceAudience.Internal
 case class TextMatch(queryString: String) extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = Array()
 }
 
 @InterfaceAudience.Internal
 case class TextMatchLimit(queryString: String, maxDoc: String) extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = Array()
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
index 78d6a46..e95ce34 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
@@ -20,18 +20,24 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.sources.Filter
 
+object ExtractReferences {
+  def apply(expr: Expression): Array[String] = {
+    expr.references.map(_.name).toArray
+  }
+}
+
 case class CastExpr(expr: Expression) extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = ExtractReferences(expr)
 }
 
 case class FalseExpr() extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = Array.empty
 }
 
 case class CarbonEndsWith(expr: Expression) extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = ExtractReferences(expr)
 }
 
 case class CarbonContainsWith(expr: Expression) extends Filter {
-  override def references: Array[String] = null
+  override def references: Array[String] = ExtractReferences(expr)
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b3d17f5..b0b0742 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.execution.strategy.PushDownHelper
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
+import org.apache.spark.sql.sources.{And, BaseRelation, Filter, InsertableRelation, Or}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 
@@ -72,7 +72,8 @@ case class CarbonDatasourceHadoopRelation(
       projects: Seq[NamedExpression],
       filters: Array[Filter],
       partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
-    val filterExpression: Option[Expression] = filters.flatMap { filter =>
+    val reorderedFilter = CarbonFilters.reorderFilter(filters, carbonTable)
+    val filterExpression: Option[Expression] = reorderedFilter.flatMap { filter =>
       CarbonFilters.createCarbonFilter(schema, filter,
         carbonTable.getTableInfo.getFactTable.getTableProperties.asScala)
     }.reduceOption(new AndExpression(_, _))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 124bd63..23f74d3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -99,8 +99,7 @@ object CarbonSetCommand {
     if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
       if (key.split("\\.").length == 5) {
         sessionParams.addProperty(key.toLowerCase(), value)
-      }
-      else {
+      } else {
         throw new MalformedCarbonCommandException(
           "property should be in \" carbon.input.segments.<database_name>" +
           ".<table_name>=<seg_id list> \" format.")
@@ -121,8 +120,7 @@ object CarbonSetCommand {
     } else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
       if (key.split("\\.").length == 7) {
         sessionParams.addProperty(key.toLowerCase(), value)
-      }
-      else {
+      } else {
         throw new MalformedCarbonCommandException(
           "property should be in \" carbon.table.load.sort.scope.<database_name>" +
           ".<table_name>=<sort_scope> \" format.")
@@ -132,8 +130,9 @@ object CarbonSetCommand {
       if (keySplits.length == 6 || keySplits.length == 4) {
         sessionParams.addProperty(key.toString, value)
       }
-    }
-    else if (isCarbonProperty) {
+    } else if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_REORDER_FILTER)) {
+      sessionParams.addProperty(key, value)
+    } else if (isCarbonProperty) {
       sessionParams.addProperty(key, value)
     }
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index cf9ca42..e8a8c3a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSour
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.hive.{CarbonHiveIndexMetadataUtil, CarbonSessionCatalogUtil}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
 import org.apache.spark.util.CarbonReflectionUtils
 
@@ -35,7 +36,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression, MatchExpression}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression,
+   Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression, MatchExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
@@ -373,6 +375,152 @@ object CarbonFilters {
     val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
     getPartitions(partitionFilters, sparkSession, carbonTable)
   }
+
+  def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = {
+    val column = filter.references.map(carbonTable.getColumnByName)
+    if (column.isEmpty) {
+      -1
+    } else {
+      if (column.head.isDimension) {
+        column.head.getOrdinal
+      } else {
+        column.head.getOrdinal + carbonTable.getAllDimensions.size()
+      }
+    }
+  }
+
+  def collectSimilarExpressions(filter: Filter, table: CarbonTable): Seq[(Filter, Int)] = {
+    filter match {
+      case sources.And(left, right) =>
+        collectSimilarExpressions(left, table) ++ collectSimilarExpressions(right, table)
+      case sources.Or(left, right) => collectSimilarExpressions(left, table) ++
+                              collectSimilarExpressions(right, table)
+      case others => Seq((others, getStorageOrdinal(others, table)))
+    }
+  }
+
+  /**
+   * This method will reorder the filter based on the Storage Ordinal of the column references.
+   *
+   * Example1:
+   *             And                                   And
+   *      Or          And             =>        Or            And
+   *  col3  col1  col2  col1                col1  col3    col1   col2
+   *
+   *  **Mixed expression filter reordered locally, but wont be reordered globally.**
+   *
+   * Example2:
+   *             And                                   And
+   *      And          And           =>       And            And
+   *  col3  col1  col2  col1                col1  col1    col2   col3
+   *
+   *             Or                                    Or
+   *       Or          Or             =>        Or            Or
+   *   col3  col1  col2  col1               col1  col1    col2   col3
+   *
+   *  **Similar expression filters are reordered globally**
+   *
+   * @param filters the filter expressions to be reordered
+   * @return The reordered filter with the current ordinal
+   */
+  def reorderFilter(filters: Array[Filter], table: CarbonTable): Array[Filter] = {
+    val filterMap = mutable.HashMap[String, List[(Filter, Int)]]()
+    if (!CarbonProperties.isFilterReorderingEnabled) {
+      filters
+    } else {
+      filters.collect {
+        // If the filter size is one or the user has disabled reordering then no need to reorder.
+        case filter if filter.references.toSet.size == 1 =>
+          (filter, getStorageOrdinal(filter, table))
+        case filter =>
+          val sortedFilter = sortFilter(filter, filterMap, table)
+          // If filter has only AND/OR expression then sort the nodes globally using the filterMap.
+          // Else sort the subnodes individually
+          if (!filterMap.contains("OR") && filterMap.contains("AND") && filterMap("AND").nonEmpty) {
+            val sortedFilterAndOrdinal = filterMap("AND").sortBy(_._2)
+            (sortedFilterAndOrdinal.map(_._1).reduce(sources.And), sortedFilterAndOrdinal.head._2)
+          } else if (!filterMap.contains("AND") && filterMap.contains("OR") &&
+                     filterMap("OR").nonEmpty) {
+            val sortedFilterAndOrdinal = filterMap("OR").sortBy(_._2)
+            (sortedFilterAndOrdinal.map(_._1).reduce(sources.Or), sortedFilterAndOrdinal.head._2)
+          } else {
+            sortedFilter
+          }
+      }.sortBy(_._2).map(_._1)
+    }
+  }
+
+  def generateNewFilter(filterType: String, left: Filter, right: Filter,
+      filterMap: mutable.HashMap[String, List[(Filter, Int)]],
+      table: CarbonTable): (Filter, Int) = {
+    filterMap.getOrElseUpdate(filterType, List())
+    // Generate a function which can handle both AND/OR.
+    val newFilter: (Filter, Filter) => Filter = filterType match {
+      case "OR" => sources.Or
+      case _ => sources.And
+    }
+    if (checkIfRightIsASubsetOfLeft(left, right)) {
+      val (sorted, ordinal) = sortFilter(left, filterMap, table)
+      val rightOrdinal = getStorageOrdinal(right, table)
+      val orderedFilter = if (ordinal >= rightOrdinal) {
+        (newFilter(right, sorted), rightOrdinal)
+      } else {
+        (newFilter(sorted, right), ordinal)
+      }
+      if (isLeafNode(left)) {
+        filterMap.put(filterType, filterMap(filterType) ++ List((sorted, ordinal)))
+      }
+      if (isLeafNode(right)) {
+        filterMap.put(filterType, filterMap(filterType) ++ List((right, rightOrdinal)))
+      }
+      orderedFilter
+    } else {
+      val (leftSorted, leftOrdinal) = sortFilter(left, filterMap, table)
+      val (rightSorted, rightOrdinal) = sortFilter(right, filterMap, table)
+      val orderedFilter = if (leftOrdinal > rightOrdinal) {
+        (newFilter(rightSorted, leftSorted), rightOrdinal)
+      } else {
+        (newFilter(leftSorted, rightSorted), leftOrdinal)
+      }
+      if (isLeafNode(left)) {
+        filterMap.put(filterType, filterMap(filterType) ++ List((leftSorted, leftOrdinal)))
+      }
+      if (isLeafNode(right)) {
+        filterMap.put(filterType, filterMap(filterType) ++ List((rightSorted, rightOrdinal)))
+      }
+      orderedFilter
+    }
+  }
+
+  def sortFilter(filter: Filter, filterMap: mutable.HashMap[String, List[(Filter, Int)]],
+      table: CarbonTable): (Filter, Int) = {
+    filter match {
+      case sources.And(left, right) =>
+        generateNewFilter("AND", left, right, filterMap, table)
+      case sources.Or(left, right) =>
+        generateNewFilter("OR", left, right, filterMap, table)
+      case others => (others, getStorageOrdinal(others, table))
+    }
+  }
+
+  /**
+   * Checks if the filter node is a leaf node, here leaf node means node should not be AND/OR.
+   *
+   * @return true if leaf node, false otherwise
+   */
+  private def isLeafNode(filter: Filter): Boolean = {
+    !filter.isInstanceOf[sources.Or] && !filter.isInstanceOf[sources.And]
+  }
+
+  /**
+   * Checks if the references in right subtree of a filter are a subset of the left references.
+   * @return true if right is a subset, otherwise false
+   */
+  private def checkIfRightIsASubsetOfLeft(left: Filter, right: Filter): Boolean = {
+    left.references.toSeq == right.references.toSeq ||
+    right.references.diff(left.references).length == 0
+  }
+
   /**
    * Fetches partition information from hive
    * @param partitionFilters
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala
new file mode 100644
index 0000000..3cee00e
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.query
+
+import org.apache.spark.sql.{CarbonEnv, CarbonUtils}
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.sources.{And, EqualTo, Filter, Or}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+class TestFilterReordering extends QueryTest with BeforeAndAfterAll{
+
+  override protected def beforeAll(): Unit = {
+    sql("drop table if exists filter_reorder")
+    sql("create table filter_reorder(one string, two string, three string, four int, " +
+        "five int) stored as carbondata")
+  }
+
+  test("Test filter reorder with various conditions") {
+    val filter1 = Or(And(EqualTo("four", 11), EqualTo("two", 11)), EqualTo("one", 11))
+    val table = CarbonEnv.getCarbonTable(None, "filter_reorder")(sqlContext.sparkSession)
+    var d: Array[Filter] = CarbonFilters.reorderFilter(Array(filter1), table)
+    assert(d.head.references.sameElements(Array("one", "two", "four")))
+
+    val filter2 = Or(Or(EqualTo("four", 11), EqualTo("two", 11)),
+      Or(EqualTo("one", 11), Or(EqualTo("five", 11), EqualTo("three", 11))))
+    d = CarbonFilters.reorderFilter(Array(filter2), table)
+    assert(d.head.references.sameElements(Array("one", "two", "three", "four", "five")))
+
+    val filter3 = Or(Or(EqualTo("four", 11), EqualTo("two", 11)),
+      Or(EqualTo("one", 11), Or(EqualTo("five", 11),
+        And(EqualTo("three", 11), EqualTo("three", 11)))))
+    d = CarbonFilters.reorderFilter(Array(filter3), table)
+    assert(d.head.references.sameElements(Array("one", "three", "three", "five", "two", "four")))
+
+    val filter4: Array[Filter] = Array(EqualTo("four", 11), EqualTo("two", 11), EqualTo("one", 11),
+      EqualTo("five", 11), EqualTo("three", 11))
+    d = CarbonFilters.reorderFilter(filter4, table)
+    assert(d.map(_.references.head) sameElements
+           Array("one", "two", "three", "four", "five"))
+  }
+
+  test("test disabling filter reordering") {
+    sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=false")
+    CarbonUtils.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
+    val filter1 = Or(And(EqualTo("four", 11), EqualTo("two", 11)), EqualTo("one", 11))
+    val table = CarbonEnv.getCarbonTable(None, "filter_reorder")(sqlContext.sparkSession)
+    val d: Array[Filter] = CarbonFilters.reorderFilter(Array(filter1), table)
+    assert(d.head.references.sameElements(Array("four", "two", "one")))
+    sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=true")
+  }
+
+  override protected def afterAll(): Unit = {
+    sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=true")
+    CarbonUtils.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
+    sql("drop table if exists filter_reorder")
+  }
+}