You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/10/16 20:22:17 UTC

[2/5] hbase git commit: HBASE-14406 The dataframe datasource filter is wrong, and will result in data loss or unexpected behavior (Ted Malaska)

http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index a180de2..23480bb 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
 import org.apache.hadoop.hbase.types._
-import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, PositionedByteRange, Bytes}
+import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange,
+PositionedByteRange, Bytes}
 import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
@@ -44,7 +45,7 @@ import scala.collection.mutable
  * - Type conversions of basic SQL types.  All conversions will be
  *   Through the HBase Bytes object commands.
  */
-class DefaultSource extends RelationProvider {
+class DefaultSource extends RelationProvider with Logging {
 
   val TABLE_KEY:String = "hbase.table"
   val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
@@ -52,6 +53,7 @@ class DefaultSource extends RelationProvider {
   val CACHING_NUM_KEY:String = "hbase.caching.num"
   val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
   val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
+  val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
 
   /**
    * Is given input from SparkSQL to construct a BaseRelation
@@ -73,6 +75,7 @@ class DefaultSource extends RelationProvider {
     val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
     val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
     val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
+    val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
 
     val batchingNum:Int = try {
       batchingNumStr.toInt
@@ -95,7 +98,8 @@ class DefaultSource extends RelationProvider {
       batchingNum.toInt,
       cachingNum.toInt,
       hbaseConfigResources,
-      useHBaseReources.equalsIgnoreCase("true"))(sqlContext)
+      useHBaseReources.equalsIgnoreCase("true"),
+      usePushDownColumnFilter.equalsIgnoreCase("true"))(sqlContext)
   }
 
   /**
@@ -161,7 +165,8 @@ class HBaseRelation (val tableName:String,
                      val batchingNum:Int,
                      val cachingNum:Int,
                      val configResources:String,
-                     val useHBaseContext:Boolean) (
+                     val useHBaseContext:Boolean,
+                     val usePushDownColumnFilter:Boolean) (
   @transient val sqlContext:SQLContext)
   extends BaseRelation with PrunedFilteredScan with Logging {
 
@@ -217,151 +222,158 @@ class HBaseRelation (val tableName:String,
    */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
 
-    val columnFilterCollection = buildColumnFilterCollection(filters)
+    val pushDownTuple = buildPushDownPredicatesResource(filters)
+    val pushDownRowKeyFilter = pushDownTuple._1
+    var pushDownDynamicLogicExpression = pushDownTuple._2
+    val valueArray = pushDownTuple._3
+
+    if (!usePushDownColumnFilter) {
+      pushDownDynamicLogicExpression = null
+    }
+
+    logDebug("pushDownRowKeyFilter:           " + pushDownRowKeyFilter.ranges)
+    if (pushDownDynamicLogicExpression != null) {
+      logDebug("pushDownDynamicLogicExpression: " +
+        pushDownDynamicLogicExpression.toExpressionString)
+    }
+    logDebug("valueArray:                     " + valueArray.length)
+
+    val requiredQualifierDefinitionList =
+      new mutable.MutableList[SchemaQualifierDefinition]
 
-    val requiredQualifierDefinitionArray = new mutable.MutableList[SchemaQualifierDefinition]
     requiredColumns.foreach( c => {
       val definition = schemaMappingDefinition.get(c)
-      if (definition.columnFamilyBytes.length > 0) {
-        requiredQualifierDefinitionArray += definition
-      }
+      requiredQualifierDefinitionList += definition
     })
 
     //Create a local variable so that scala doesn't have to
     // serialize the whole HBaseRelation Object
     val serializableDefinitionMap = schemaMappingDefinition
 
-
     //retain the information for unit testing checks
-    DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection,
-      requiredQualifierDefinitionArray)
+    DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
+      pushDownDynamicLogicExpression)
     var resultRDD: RDD[Row] = null
 
-    if (columnFilterCollection != null) {
-      val pushDownFilterJava =
-        new SparkSQLPushDownFilter(
-          columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition))
-
-      val getList = new util.ArrayList[Get]()
-      val rddList = new util.ArrayList[RDD[Row]]()
-
-      val it = columnFilterCollection.columnFilterMap.iterator
-      while (it.hasNext) {
-        val e = it.next()
-        val columnDefinition = schemaMappingDefinition.get(e._1)
-        //check is a rowKey
-        if (columnDefinition != null && columnDefinition.columnFamily.isEmpty) {
-          //add points to getList
-          e._2.points.foreach(p => {
-            val get = new Get(p)
-            requiredQualifierDefinitionArray.foreach( d =>
-              get.addColumn(d.columnFamilyBytes, d.qualifierBytes))
-            getList.add(get)
-          })
+    val getList = new util.ArrayList[Get]()
+    val rddList = new util.ArrayList[RDD[Row]]()
 
-          val rangeIt = e._2.ranges.iterator
+    //add points to getList
+    pushDownRowKeyFilter.points.foreach(p => {
+      val get = new Get(p)
+      requiredQualifierDefinitionList.foreach( d => {
+        if (d.columnFamilyBytes.length > 0)
+          get.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+      })
+      getList.add(get)
+    })
 
-          while (rangeIt.hasNext) {
-            val r = rangeIt.next()
+    val rangeIt = pushDownRowKeyFilter.ranges.iterator
 
-            val scan = new Scan()
-            scan.setBatch(batchingNum)
-            scan.setCaching(cachingNum)
-            requiredQualifierDefinitionArray.foreach( d =>
-              scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
+    while (rangeIt.hasNext) {
+      val r = rangeIt.next()
 
-            if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 0) {
-              scan.setFilter(pushDownFilterJava)
-            }
+      val scan = new Scan()
+      scan.setBatch(batchingNum)
+      scan.setCaching(cachingNum)
+      requiredQualifierDefinitionList.foreach( d =>
+        if (d.columnFamilyBytes.length > 0)
+          scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
 
-            //Check if there is a lower bound
-            if (r.lowerBound != null && r.lowerBound.length > 0) {
-
-              if (r.isLowerBoundEqualTo) {
-                //HBase startRow is inclusive: Therefore it acts like  isLowerBoundEqualTo
-                // by default
-                scan.setStartRow(r.lowerBound)
-              } else {
-                //Since we don't equalTo we want the next value we need
-                // to add another byte to the start key.  That new byte will be
-                // the min byte value.
-                val newArray = new Array[Byte](r.lowerBound.length + 1)
-                System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
-
-                //new Min Byte
-                newArray(r.lowerBound.length) = Byte.MinValue
-                scan.setStartRow(newArray)
-              }
-            }
+      if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
+        val pushDownFilterJava =
+          new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
+            valueArray, requiredQualifierDefinitionList)
 
-            //Check if there is a upperBound
-            if (r.upperBound != null && r.upperBound.length > 0) {
-              if (r.isUpperBoundEqualTo) {
-                //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
-                // by default.  So we need to add a new max byte to the stopRow key
-                val newArray = new Array[Byte](r.upperBound.length + 1)
-                System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
-
-                //New Max Bytes
-                newArray(r.upperBound.length) = Byte.MaxValue
-
-                scan.setStopRow(newArray)
-              } else {
-                //Here equalTo is false for Upper bound which is exclusive and
-                // HBase stopRow acts like that by default so no need to mutate the
-                // rowKey
-                scan.setStopRow(r.upperBound)
-              }
-            }
+        scan.setFilter(pushDownFilterJava)
+      }
 
-            val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
-              Row.fromSeq(requiredColumns.map(c =>
-                DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
-            })
-            rddList.add(rdd)
-          }
+      //Check if there is a lower bound
+      if (r.lowerBound != null && r.lowerBound.length > 0) {
+
+        if (r.isLowerBoundEqualTo) {
+          //HBase startRow is inclusive: Therefore it acts like  isLowerBoundEqualTo
+          // by default
+          scan.setStartRow(r.lowerBound)
+        } else {
+          //Since we don't equalTo we want the next value we need
+          // to add another byte to the start key.  That new byte will be
+          // the min byte value.
+          val newArray = new Array[Byte](r.lowerBound.length + 1)
+          System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
+
+          //new Min Byte
+          newArray(r.lowerBound.length) = Byte.MinValue
+          scan.setStartRow(newArray)
         }
       }
 
-      //If there is more then one RDD then we have to union them together
-      for (i <- 0 until rddList.size()) {
-        if (resultRDD == null) resultRDD = rddList.get(i)
-        else resultRDD = resultRDD.union(rddList.get(i))
-
+      //Check if there is a upperBound
+      if (r.upperBound != null && r.upperBound.length > 0) {
+        if (r.isUpperBoundEqualTo) {
+          //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
+          // by default.  So we need to add a new max byte to the stopRow key
+          val newArray = new Array[Byte](r.upperBound.length + 1)
+          System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
+
+          //New Max Bytes
+          newArray(r.upperBound.length) = Byte.MaxValue
+          scan.setStopRow(newArray)
+        } else {
+          //Here equalTo is false for Upper bound which is exclusive and
+          // HBase stopRow acts like that by default so no need to mutate the
+          // rowKey
+          scan.setStopRow(r.upperBound)
+        }
       }
 
-      //If there are gets then we can get them from the driver and union that rdd in
-      // with the rest of the values.
-      if (getList.size() > 0) {
-        val connection = ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
+      val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
+        Row.fromSeq(requiredColumns.map(c =>
+          DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
+      })
+      rddList.add(rdd)
+    }
+
+    //If there is more then one RDD then we have to union them together
+    for (i <- 0 until rddList.size()) {
+      if (resultRDD == null) resultRDD = rddList.get(i)
+      else resultRDD = resultRDD.union(rddList.get(i))
+
+    }
+
+    //If there are gets then we can get them from the driver and union that rdd in
+    // with the rest of the values.
+    if (getList.size() > 0) {
+      val connection =
+        ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
+      try {
+        val table = connection.getTable(TableName.valueOf(tableName))
         try {
-          val table = connection.getTable(TableName.valueOf(tableName))
-          try {
-            val results = table.get(getList)
-            val rowList = mutable.MutableList[Row]()
-            for (i <- 0 until results.length) {
-              val rowArray = requiredColumns.map(c =>
-                DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
-              rowList += Row.fromSeq(rowArray)
-            }
-            val getRDD = sqlContext.sparkContext.parallelize(rowList)
-            if (resultRDD == null) resultRDD = getRDD
-            else {
-              resultRDD = resultRDD.union(getRDD)
-            }
-          } finally {
-            table.close()
+          val results = table.get(getList)
+          val rowList = mutable.MutableList[Row]()
+          for (i <- 0 until results.length) {
+            val rowArray = requiredColumns.map(c =>
+              DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
+            rowList += Row.fromSeq(rowArray)
+          }
+          val getRDD = sqlContext.sparkContext.parallelize(rowList)
+          if (resultRDD == null) resultRDD = getRDD
+          else {
+            resultRDD = resultRDD.union(getRDD)
           }
         } finally {
-          connection.close()
+          table.close()
         }
+      } finally {
+        connection.close()
       }
     }
+
     if (resultRDD == null) {
       val scan = new Scan()
       scan.setBatch(batchingNum)
       scan.setCaching(cachingNum)
-      requiredQualifierDefinitionArray.foreach( d =>
+      requiredQualifierDefinitionList.foreach( d =>
         scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
 
       val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
@@ -373,79 +385,135 @@ class HBaseRelation (val tableName:String,
     resultRDD
   }
 
-  /**
-   * Root recursive function that will loop over the filters provided by
-   * SparkSQL.  Some filters are AND or OR functions and contain additional filters
-   * hence the need for recursion.
-   *
-   * @param filters Filters provided by SparkSQL.
-   *                Filters are joined with the AND operater
-   * @return        A ColumnFilterCollection whish is a consolidated construct to
-   *                hold the high level filter information
-   */
-  def buildColumnFilterCollection(filters: Array[Filter]): ColumnFilterCollection = {
-    var superCollection: ColumnFilterCollection = null
+  def buildPushDownPredicatesResource(filters: Array[Filter]):
+  (RowKeyFilter, DynamicLogicExpression, Array[Array[Byte]]) = {
+    var superRowKeyFilter:RowKeyFilter = null
+    val queryValueList = new mutable.MutableList[Array[Byte]]
+    var superDynamicLogicExpression: DynamicLogicExpression = null
 
     filters.foreach( f => {
-      val parentCollection = new ColumnFilterCollection
-      buildColumnFilterCollection(parentCollection, f)
-      if (superCollection == null)
-        superCollection = parentCollection
-      else
-        superCollection.mergeIntersect(parentCollection)
+      val rowKeyFilter = new RowKeyFilter()
+      val logicExpression = transverseFilterTree(rowKeyFilter, queryValueList, f)
+      if (superDynamicLogicExpression == null) {
+        superDynamicLogicExpression = logicExpression
+        superRowKeyFilter = rowKeyFilter
+      } else {
+        superDynamicLogicExpression =
+          new AndLogicExpression(superDynamicLogicExpression, logicExpression)
+        superRowKeyFilter.mergeIntersect(rowKeyFilter)
+      }
+
     })
-    superCollection
+
+    val queryValueArray = queryValueList.toArray
+
+    if (superRowKeyFilter == null) {
+      superRowKeyFilter = new RowKeyFilter
+    }
+
+    (superRowKeyFilter, superDynamicLogicExpression, queryValueArray)
   }
 
-  /**
-   * Recursive function that will work to convert Spark Filter
-   * objects to ColumnFilterCollection
-   *
-   * @param parentFilterCollection Parent ColumnFilterCollection
-   * @param filter                 Current given filter from SparkSQL
-   */
-  def buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection,
-                                  filter:Filter): Unit = {
+  def transverseFilterTree(parentRowKeyFilter:RowKeyFilter,
+                                  valueArray:mutable.MutableList[Array[Byte]],
+                                  filter:Filter): DynamicLogicExpression = {
     filter match {
 
       case EqualTo(attr, value) =>
-        parentFilterCollection.mergeUnion(attr,
-          new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr,
-            schemaMappingDefinition, value.toString)))
-
+        val columnDefinition = schemaMappingDefinition.get(attr)
+        if (columnDefinition != null) {
+          if (columnDefinition.columnFamily.isEmpty) {
+            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
+              DefaultSourceStaticUtils.getByteValue(attr,
+                schemaMappingDefinition, value.toString), null))
+          }
+          val byteValue =
+            DefaultSourceStaticUtils.getByteValue(attr,
+              schemaMappingDefinition, value.toString)
+          valueArray += byteValue
+        }
+        new EqualLogicExpression(attr, valueArray.length - 1, false)
       case LessThan(attr, value) =>
-        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-          new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
-            schemaMappingDefinition, value.toString), false,
-            new Array[Byte](0), true)))
-
+        val columnDefinition = schemaMappingDefinition.get(attr)
+        if (columnDefinition != null) {
+          if (columnDefinition.columnFamily.isEmpty) {
+            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+              new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
+                schemaMappingDefinition, value.toString), false,
+                new Array[Byte](0), true)))
+          }
+          val byteValue =
+            DefaultSourceStaticUtils.getByteValue(attr,
+              schemaMappingDefinition, value.toString)
+          valueArray += byteValue
+        }
+        new LessThanLogicExpression(attr, valueArray.length - 1)
       case GreaterThan(attr, value) =>
-        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-        new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
-          schemaMappingDefinition, value.toString), false)))
-
+        val columnDefinition = schemaMappingDefinition.get(attr)
+        if (columnDefinition != null) {
+          if (columnDefinition.columnFamily.isEmpty) {
+            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+              new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
+                schemaMappingDefinition, value.toString), false)))
+          }
+          val byteValue =
+            DefaultSourceStaticUtils.getByteValue(attr,
+              schemaMappingDefinition, value.toString)
+          valueArray += byteValue
+        }
+        new GreaterThanLogicExpression(attr, valueArray.length - 1)
       case LessThanOrEqual(attr, value) =>
-        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-        new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
-          schemaMappingDefinition, value.toString), true,
-          new Array[Byte](0), true)))
-
+        val columnDefinition = schemaMappingDefinition.get(attr)
+        if (columnDefinition != null) {
+          if (columnDefinition.columnFamily.isEmpty) {
+            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+              new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
+                schemaMappingDefinition, value.toString), true,
+                new Array[Byte](0), true)))
+          }
+          val byteValue =
+            DefaultSourceStaticUtils.getByteValue(attr,
+              schemaMappingDefinition, value.toString)
+          valueArray += byteValue
+        }
+        new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
       case GreaterThanOrEqual(attr, value) =>
-        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-        new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
-          schemaMappingDefinition, value.toString), true)))
+        val columnDefinition = schemaMappingDefinition.get(attr)
+        if (columnDefinition != null) {
+          if (columnDefinition.columnFamily.isEmpty) {
+            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+              new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
+                schemaMappingDefinition, value.toString), true)))
+          }
+          val byteValue =
+            DefaultSourceStaticUtils.getByteValue(attr,
+              schemaMappingDefinition, value.toString)
+          valueArray += byteValue
 
+        }
+        new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
       case Or(left, right) =>
-        buildColumnFilterCollection(parentFilterCollection, left)
-        val rightSideCollection = new ColumnFilterCollection
-        buildColumnFilterCollection(rightSideCollection, right)
-        parentFilterCollection.mergeUnion(rightSideCollection)
+        val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
+        val rightSideRowKeyFilter = new RowKeyFilter
+        val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
+
+        parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)
+
+        new OrLogicExpression(leftExpression, rightExpression)
       case And(left, right) =>
-        buildColumnFilterCollection(parentFilterCollection, left)
-        val rightSideCollection = new ColumnFilterCollection
-        buildColumnFilterCollection(rightSideCollection, right)
-        parentFilterCollection.mergeIntersect(rightSideCollection)
-      case _ => //nothing
+
+        val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
+        val rightSideRowKeyFilter = new RowKeyFilter
+        val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
+        parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)
+
+        new AndLogicExpression(leftExpression, rightExpression)
+      case IsNull(attr) =>
+        new IsNullLogicExpression(attr, false)
+      case IsNotNull(attr) =>
+        new IsNullLogicExpression(attr, true)
+      case _ =>
+        new PassThroughLogicExpression
     }
   }
 }
@@ -472,7 +540,7 @@ case class SchemaQualifierDefinition(columnName:String,
     else if (colType.equals("DOUBLE")) DoubleType
     else if (colType.equals("STRING")) StringType
     else if (colType.equals("TIMESTAMP")) TimestampType
-    else if (colType.equals("DECIMAL")) StringType //DataTypes.createDecimalType(precision, scale)
+    else if (colType.equals("DECIMAL")) StringType
     else throw new IllegalArgumentException("Unsupported column type :" + colType)
 }
 
@@ -524,11 +592,11 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
 
     isLowerBoundEqualTo = if (lowerBoundCompare == 0)
       isLowerBoundEqualTo || other.isLowerBoundEqualTo
-    else isLowerBoundEqualTo
+    else if (lowerBoundCompare < 0) isLowerBoundEqualTo else other.isLowerBoundEqualTo
 
     isUpperBoundEqualTo = if (upperBoundCompare == 0)
       isUpperBoundEqualTo || other.isUpperBoundEqualTo
-    else isUpperBoundEqualTo
+    else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo
   }
 
   /**
@@ -551,14 +619,15 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
    * @param other Other scan object
    * @return      True is overlap false is not overlap
    */
-  def doesOverLap(other:ScanRange): Boolean = {
+  def getOverLapScanRange(other:ScanRange): ScanRange = {
 
     var leftRange:ScanRange = null
     var rightRange:ScanRange = null
 
     //First identify the Left range
     // Also lower bound can't be null
-    if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) {
+    if (compareRange(lowerBound, other.lowerBound) < 0 ||
+      compareRange(upperBound, other.upperBound) < 0) {
       leftRange = this
       rightRange = other
     } else {
@@ -568,8 +637,12 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
 
     //Then see if leftRange goes to null or if leftRange.upperBound
     // upper is greater or equals to rightRange.lowerBound
-    leftRange.upperBound == null ||
-      Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0
+    if (leftRange.upperBound == null ||
+      Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0) {
+      new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo, rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
+    } else {
+      null
+    }
   }
 
   /**
@@ -586,9 +659,25 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
     else if (left != null && right == null) -1
     else Bytes.compareTo(left, right)
   }
+
+  /**
+   *
+   * @return
+   */
+  def containsPoint(point:Array[Byte]): Boolean = {
+    val lowerCompare = compareRange(point, lowerBound)
+    val upperCompare = compareRange(point, upperBound)
+
+    ((isLowerBoundEqualTo && lowerCompare >= 0) ||
+      (!isLowerBoundEqualTo && lowerCompare > 0)) &&
+      ((isUpperBoundEqualTo && upperCompare <= 0) ||
+        (!isUpperBoundEqualTo && upperCompare < 0))
+
+  }
   override def toString:String = {
-    "ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + "," +
-      Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")"
+    "ScanRange:(upperBound:" + Bytes.toString(upperBound) +
+      ",isUpperBoundEqualTo:" + isUpperBoundEqualTo + ",lowerBound:" +
+      Bytes.toString(lowerBound) + ",isLowerBoundEqualTo:" + isLowerBoundEqualTo + ")"
   }
 }
 
@@ -663,7 +752,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null,
     other.ranges.foreach( otherR => {
       var doesOverLap = false
       ranges.foreach{ r =>
-        if (r.doesOverLap(otherR)) {
+        if (r.getOverLapScanRange(otherR) != null) {
           r.mergeUnion(otherR)
           doesOverLap = true
         }}
@@ -692,7 +781,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null,
 
     other.ranges.foreach( otherR => {
       ranges.foreach( r => {
-        if (r.doesOverLap(otherR)) {
+        if (r.getOverLapScanRange(otherR) != null) {
           r.mergeIntersect(otherR)
           survivingRanges += r
         }
@@ -842,7 +931,8 @@ object DefaultSourceStaticUtils {
     getFreshByteRange(bytes, 0, bytes.length)
   }
 
-  def getFreshByteRange(bytes:Array[Byte],  offset:Int = 0, length:Int): PositionedByteRange = {
+  def getFreshByteRange(bytes:Array[Byte],  offset:Int = 0, length:Int):
+  PositionedByteRange = {
     byteRange.get().set(bytes).setLength(length).setOffset(offset)
   }
 
@@ -856,14 +946,13 @@ object DefaultSourceStaticUtils {
    * This method is to populate the lastFiveExecutionRules for unit test perposes
    * This method is not thread safe.
    *
-   * @param columnFilterCollection           The filters in the last job
-   * @param requiredQualifierDefinitionArray The required columns in the last job
+   * @param rowKeyFilter           The rowKey Filter logic used in the last query
+   * @param dynamicLogicExpression The dynamicLogicExpression used in the last query
    */
-  def populateLatestExecutionRules(columnFilterCollection: ColumnFilterCollection,
-                                   requiredQualifierDefinitionArray:
-                                   mutable.MutableList[SchemaQualifierDefinition]):Unit = {
+  def populateLatestExecutionRules(rowKeyFilter: RowKeyFilter,
+                                   dynamicLogicExpression: DynamicLogicExpression):Unit = {
     lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
-      columnFilterCollection, requiredQualifierDefinitionArray))
+      rowKeyFilter, dynamicLogicExpression))
     while (lastFiveExecutionRules.size() > 5) {
       lastFiveExecutionRules.poll()
     }
@@ -977,6 +1066,162 @@ object DefaultSourceStaticUtils {
   }
 }
 
-class ExecutionRuleForUnitTesting(val columnFilterCollection: ColumnFilterCollection,
-                                  val requiredQualifierDefinitionArray:
-                                  mutable.MutableList[SchemaQualifierDefinition])
+/**
+ * Contains information related to a filters for a given column.
+ * This can contain many ranges or points.
+ *
+ * @param currentPoint the initial point when the filter is created
+ * @param currentRange the initial scanRange when the filter is created
+ */
+class RowKeyFilter (currentPoint:Array[Byte] = null,
+                    currentRange:ScanRange =
+                    new ScanRange(null, true, new Array[Byte](0), true),
+                    var points:mutable.MutableList[Array[Byte]] =
+                    new mutable.MutableList[Array[Byte]](),
+                    var ranges:mutable.MutableList[ScanRange] =
+                    new mutable.MutableList[ScanRange]() ) extends Serializable {
+  //Collection of ranges
+  if (currentRange != null ) ranges.+=(currentRange)
+
+  //Collection of points
+  if (currentPoint != null) points.+=(currentPoint)
+
+  /**
+   * This will validate a give value through the filter's points and/or ranges
+   * the result will be if the value passed the filter
+   *
+   * @param value       Value to be validated
+   * @param valueOffSet The offset of the value
+   * @param valueLength The length of the value
+   * @return            True is the value passes the filter false if not
+   */
+  def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
+    var result = false
+
+    points.foreach( p => {
+      if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
+        result = true
+      }
+    })
+
+    ranges.foreach( r => {
+      val upperBoundPass = r.upperBound == null ||
+        (r.isUpperBoundEqualTo &&
+          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
+            value, valueOffSet, valueLength) >= 0) ||
+        (!r.isUpperBoundEqualTo &&
+          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
+            value, valueOffSet, valueLength) > 0)
+
+      val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
+      (r.isLowerBoundEqualTo &&
+        Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
+          value, valueOffSet, valueLength) <= 0) ||
+        (!r.isLowerBoundEqualTo &&
+          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
+            value, valueOffSet, valueLength) < 0)
+
+      result = result || (upperBoundPass && lowerBoundPass)
+    })
+    result
+  }
+
+  /**
+   * This will allow us to merge filter logic that is joined to the existing filter
+   * through a OR operator
+   *
+   * @param other Filter to merge
+   */
+  def mergeUnion(other:RowKeyFilter): Unit = {
+    other.points.foreach( p => points += p)
+
+    other.ranges.foreach( otherR => {
+      var doesOverLap = false
+      ranges.foreach{ r =>
+        if (r.getOverLapScanRange(otherR) != null) {
+          r.mergeUnion(otherR)
+          doesOverLap = true
+        }}
+      if (!doesOverLap) ranges.+=(otherR)
+    })
+  }
+
+  /**
+   * This will allow us to merge filter logic that is joined to the existing filter
+   * through a AND operator
+   *
+   * @param other Filter to merge
+   */
+  def mergeIntersect(other:RowKeyFilter): Unit = {
+    val survivingPoints = new mutable.MutableList[Array[Byte]]()
+    val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]()
+    if (points == null || points.length == 0) {
+      other.points.foreach( otherP => {
+        didntSurviveFirstPassPoints += otherP
+      })
+    } else {
+      points.foreach(p => {
+        if (other.points.length == 0) {
+          didntSurviveFirstPassPoints += p
+        } else {
+          other.points.foreach(otherP => {
+            if (Bytes.equals(p, otherP)) {
+              survivingPoints += p
+            } else {
+              didntSurviveFirstPassPoints += p
+            }
+          })
+        }
+      })
+    }
+
+    val survivingRanges = new mutable.MutableList[ScanRange]()
+
+    if (ranges.length == 0) {
+      didntSurviveFirstPassPoints.foreach(p => {
+          survivingPoints += p
+      })
+    } else {
+      ranges.foreach(r => {
+        other.ranges.foreach(otherR => {
+          val overLapScanRange = r.getOverLapScanRange(otherR)
+          if (overLapScanRange != null) {
+            survivingRanges += overLapScanRange
+          }
+        })
+        didntSurviveFirstPassPoints.foreach(p => {
+          if (r.containsPoint(p)) {
+            survivingPoints += p
+          }
+        })
+      })
+    }
+    points = survivingPoints
+    ranges = survivingRanges
+  }
+
+  override def toString:String = {
+    val strBuilder = new StringBuilder
+    strBuilder.append("(points:(")
+    var isFirst = true
+    points.foreach( p => {
+      if (isFirst) isFirst = false
+      else strBuilder.append(",")
+      strBuilder.append(Bytes.toString(p))
+    })
+    strBuilder.append("),ranges:")
+    isFirst = true
+    ranges.foreach( r => {
+      if (isFirst) isFirst = false
+      else strBuilder.append(",")
+      strBuilder.append(r)
+    })
+    strBuilder.append("))")
+    strBuilder.toString()
+  }
+}
+
+
+
+class ExecutionRuleForUnitTesting(val rowKeyFilter: RowKeyFilter,
+                                  val dynamicLogicExpression: DynamicLogicExpression)

http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
new file mode 100644
index 0000000..fa61860
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Dynamic logic for SQL push down logic there is an instance for most
+ * common operations and a pass through for other operations not covered here
+ *
+ * Logic can be nested with And or Or operators.
+ *
+ * A logic tree can be written out as a string and reconstructed from that string
+ *
+ */
+trait DynamicLogicExpression {
+  def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable],
+              valueFromQueryValueArray:Array[Array[Byte]]): Boolean
+  def toExpressionString: String = {
+    val strBuilder = new StringBuilder
+    appendToExpression(strBuilder)
+    strBuilder.toString()
+  }
+  def appendToExpression(strBuilder:StringBuilder)
+}
+
+class AndLogicExpression (val leftExpression:DynamicLogicExpression,
+                           val rightExpression:DynamicLogicExpression)
+  extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) &&
+      rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
+  }
+
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append("( ")
+    strBuilder.append(leftExpression.toExpressionString)
+    strBuilder.append(" AND ")
+    strBuilder.append(rightExpression.toExpressionString)
+    strBuilder.append(" )")
+  }
+}
+
+class OrLogicExpression (val leftExpression:DynamicLogicExpression,
+                          val rightExpression:DynamicLogicExpression)
+  extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) ||
+      rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
+  }
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append("( ")
+    strBuilder.append(leftExpression.toExpressionString)
+    strBuilder.append(" OR ")
+    strBuilder.append(rightExpression.toExpressionString)
+    strBuilder.append(" )")
+  }
+}
+
+class EqualLogicExpression (val columnName:String,
+                            val valueFromQueryIndex:Int,
+                            val isNot:Boolean) extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+    currentRowValue != null &&
+      Bytes.equals(valueFromQuery,
+        0, valueFromQuery.length, currentRowValue.bytes,
+        currentRowValue.offset, currentRowValue.length) != isNot
+  }
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    val command = if (isNot) "!=" else "=="
+    strBuilder.append(columnName + " " + command + " " + valueFromQueryIndex)
+  }
+}
+
+class IsNullLogicExpression (val columnName:String,
+                             val isNot:Boolean) extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+
+    (currentRowValue == null) != isNot
+  }
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    val command = if (isNot) "isNotNull" else "isNull"
+    strBuilder.append(columnName + " " + command)
+  }
+}
+
+class GreaterThanLogicExpression (val columnName:String,
+                                  val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+    currentRowValue != null &&
+      Bytes.compareTo(currentRowValue.bytes,
+        currentRowValue.offset, currentRowValue.length, valueFromQuery,
+        0, valueFromQuery.length) > 0
+  }
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append(columnName + " > " + valueFromQueryIndex)
+  }
+}
+
+class GreaterThanOrEqualLogicExpression (val columnName:String,
+                                         val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+    currentRowValue != null &&
+      Bytes.compareTo(currentRowValue.bytes,
+        currentRowValue.offset, currentRowValue.length, valueFromQuery,
+        0, valueFromQuery.length) >= 0
+  }
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append(columnName + " >= " + valueFromQueryIndex)
+  }
+}
+
+class LessThanLogicExpression (val columnName:String,
+                               val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+    currentRowValue != null &&
+      Bytes.compareTo(currentRowValue.bytes,
+        currentRowValue.offset, currentRowValue.length, valueFromQuery,
+        0, valueFromQuery.length) < 0
+  }
+
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append(columnName + " < " + valueFromQueryIndex)
+  }
+}
+
+class LessThanOrEqualLogicExpression (val columnName:String,
+                                      val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+    currentRowValue != null &&
+      Bytes.compareTo(currentRowValue.bytes,
+        currentRowValue.offset, currentRowValue.length, valueFromQuery,
+        0, valueFromQuery.length) <= 0
+  }
+
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append(columnName + " <= " + valueFromQueryIndex)
+  }
+}
+
+class PassThroughLogicExpression() extends DynamicLogicExpression {
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray: Array[Array[Byte]]): Boolean = true
+
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append("Pass")
+  }
+}
+
+object DynamicLogicExpressionBuilder {
+  def build(expressionString:String): DynamicLogicExpression = {
+
+    val expressionAndOffset = build(expressionString.split(' '), 0)
+    expressionAndOffset._1
+  }
+
+  private def build(expressionArray:Array[String],
+                    offSet:Int): (DynamicLogicExpression, Int) = {
+    if (expressionArray(offSet).equals("(")) {
+      val left = build(expressionArray, offSet + 1)
+      val right = build(expressionArray, left._2 + 1)
+      if (expressionArray(left._2).equals("AND")) {
+        (new AndLogicExpression(left._1, right._1), right._2 + 1)
+      } else if (expressionArray(left._2).equals("OR")) {
+        (new OrLogicExpression(left._1, right._1), right._2 + 1)
+      } else {
+        throw new Throwable("Unknown gate:" + expressionArray(left._2))
+      }
+    } else {
+      val command = expressionArray(offSet + 1)
+      if (command.equals("<")) {
+        (new LessThanLogicExpression(expressionArray(offSet),
+          expressionArray(offSet + 2).toInt), offSet + 3)
+      } else if (command.equals("<=")) {
+        (new LessThanOrEqualLogicExpression(expressionArray(offSet),
+          expressionArray(offSet + 2).toInt), offSet + 3)
+      } else if (command.equals(">")) {
+        (new GreaterThanLogicExpression(expressionArray(offSet),
+          expressionArray(offSet + 2).toInt), offSet + 3)
+      } else if (command.equals(">=")) {
+        (new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
+          expressionArray(offSet + 2).toInt), offSet + 3)
+      } else if (command.equals("==")) {
+        (new EqualLogicExpression(expressionArray(offSet),
+          expressionArray(offSet + 2).toInt, false), offSet + 3)
+      } else if (command.equals("!=")) {
+        (new EqualLogicExpression(expressionArray(offSet),
+          expressionArray(offSet + 2).toInt, true), offSet + 3)
+      } else if (command.equals("isNull")) {
+        (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
+      } else if (command.equals("isNotNull")) {
+        (new IsNullLogicExpression(expressionArray(offSet), true), offSet + 2)
+      } else if (command.equals("Pass")) {
+        (new PassThroughLogicExpression, offSet + 2)
+      } else {
+        throw new Throwable("Unknown logic command:" + command)
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index fb475b0..2cee3a8 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -29,7 +29,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
   @transient var sc: SparkContext = null
   var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
 
-  val tableName = "t1"
+  val t1TableName = "t1"
+  val t2TableName = "t2"
   val columnFamily = "c"
 
   var sqlContext:SQLContext = null
@@ -41,50 +42,94 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     logInfo(" - minicluster started")
     try
-      TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+      TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
     catch {
-      case e: Exception => logInfo(" - no table " + tableName + " found")
-
+      case e: Exception => logInfo(" - no table " + t1TableName + " found")
     }
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+    try
+      TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
+    catch {
+      case e: Exception => logInfo(" - no table " + t2TableName + " found")
+    }
+    logInfo(" - creating table " + t1TableName)
+    TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
+    logInfo(" - created table")
+    logInfo(" - creating table " + t2TableName)
+    TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
     logInfo(" - created table")
 
     sc = new SparkContext("local", "test")
 
     val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
-    val table = connection.getTable(TableName.valueOf("t1"))
-
     try {
-      var put = new Put(Bytes.toBytes("get1"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
-      table.put(put)
-      put = new Put(Bytes.toBytes("get2"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
-      table.put(put)
-      put = new Put(Bytes.toBytes("get3"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
-      table.put(put)
-      put = new Put(Bytes.toBytes("get4"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
-      table.put(put)
-      put = new Put(Bytes.toBytes("get5"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
-      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
-      table.put(put)
+      val t1Table = connection.getTable(TableName.valueOf("t1"))
+
+      try {
+        var put = new Put(Bytes.toBytes("get1"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
+        t1Table.put(put)
+        put = new Put(Bytes.toBytes("get2"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
+        t1Table.put(put)
+        put = new Put(Bytes.toBytes("get3"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+        t1Table.put(put)
+        put = new Put(Bytes.toBytes("get4"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
+        t1Table.put(put)
+        put = new Put(Bytes.toBytes("get5"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+        t1Table.put(put)
+      } finally {
+        t1Table.close()
+      }
+
+      val t2Table = connection.getTable(TableName.valueOf("t2"))
+
+      try {
+        var put = new Put(Bytes.toBytes(1))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
+        t2Table.put(put)
+        put = new Put(Bytes.toBytes(2))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
+        t2Table.put(put)
+        put = new Put(Bytes.toBytes(3))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+        t2Table.put(put)
+        put = new Put(Bytes.toBytes(4))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
+        t2Table.put(put)
+        put = new Put(Bytes.toBytes(5))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+        t2Table.put(put)
+      } finally {
+        t2Table.close()
+      }
     } finally {
-      table.close()
       connection.close()
     }
 
@@ -98,23 +143,36 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
         "hbase.batching.num" -> "100",
         "cachingNum" -> "100"))
 
-    df.registerTempTable("hbaseTmp")
+    df.registerTempTable("hbaseTable1")
+
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
+        "hbase.table" -> "t2",
+        "hbase.batching.num" -> "100",
+        "cachingNum" -> "100"))
+
+    df.registerTempTable("hbaseTable2")
   }
 
   override def afterAll() {
-    TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
     logInfo("shuting down minicluster")
     TEST_UTIL.shutdownMiniCluster()
 
     sc.stop()
   }
 
+  override def beforeEach(): Unit = {
+    DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
+  }
+
 
   /**
    * A example of query three fields and also only using rowkey points for the filter
    */
   test("Test rowKey point only rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
       "WHERE " +
       "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
 
@@ -122,23 +180,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     assert(results.length == 3)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
-    val keyFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
-    assert(keyFieldFilter.ranges.length == 0)
-    assert(keyFieldFilter.points.length == 3)
-    assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
-    assert(Bytes.toString(keyFieldFilter.points(1)).equals("get2"))
-    assert(Bytes.toString(keyFieldFilter.points(2)).equals("get3"))
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( KEY_FIELD == 0 OR KEY_FIELD == 1 ) OR KEY_FIELD == 2 )"))
 
-    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+    assert(executionRules.rowKeyFilter.points.size == 3)
+    assert(executionRules.rowKeyFilter.ranges.size == 0)
   }
 
   /**
    * A example of query three fields and also only using cell points for the filter
    */
   test("Test cell point only rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
       "WHERE " +
       "(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
 
@@ -146,17 +199,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     assert(results.length == 3)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
-    val bFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
-    assert(bFieldFilter.ranges.length == 0)
-    assert(bFieldFilter.points.length == 2)
-    val aFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("A_FIELD").get
-    assert(aFieldFilter.ranges.length == 0)
-    assert(aFieldFilter.points.length == 1)
-
-    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( B_FIELD == 0 OR B_FIELD == 1 ) OR A_FIELD == 2 )"))
   }
 
   /**
@@ -164,7 +208,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
    * Also an example of less then and greater then
    */
   test("Test two range rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
       "WHERE " +
       "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
 
@@ -172,13 +216,88 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     assert(results.length == 3)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
-    val keyFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
-    assert(keyFieldFilter.ranges.length == 2)
-    assert(keyFieldFilter.points.length == 0)
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 2)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+    assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2")))
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(!scanRange1.isUpperBoundEqualTo)
+
+    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+    assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3")))
+    assert(scanRange2.upperBound == null)
+    assert(!scanRange2.isLowerBoundEqualTo)
+    assert(scanRange2.isUpperBoundEqualTo)
+  }
+
+  /**
+   * A example of a OR merge between to ranges the result is one range
+   * Also an example of less then and greater then
+   *
+   * This example makes sure the code works for a int rowKey
+   */
+  test("Test two range rowKey query where the rowKey is Int and there is a range over lap") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
+      "WHERE " +
+      "( KEY_FIELD < 4 or KEY_FIELD > 2)").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+
+
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+    assert(scanRange1.upperBound == null)
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
+
+    assert(results.length == 5)
+  }
+
+  /**
+   * A example of a OR merge between to ranges the result is two ranges
+   * Also an example of less then and greater then
+   *
+   * This example makes sure the code works for a int rowKey
+   */
+  test("Test two range rowKey query where the rowKey is Int and the ranges don't over lap") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
+      "WHERE " +
+      "( KEY_FIELD < 2 or KEY_FIELD > 4)").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+
+    assert(executionRules.rowKeyFilter.ranges.size == 2)
 
-    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(!scanRange1.isUpperBoundEqualTo)
+
+    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+    assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes(4)))
+    assert(scanRange2.upperBound == null)
+    assert(!scanRange2.isLowerBoundEqualTo)
+    assert(scanRange2.isUpperBoundEqualTo)
+
+    assert(results.length == 2)
   }
 
   /**
@@ -186,7 +305,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
    * Also an example of less then and equal to and greater then and equal to
    */
   test("Test one combined range rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
       "WHERE " +
       "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
 
@@ -194,13 +313,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     assert(results.length == 2)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
-    val keyFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
-    assert(keyFieldFilter.ranges.length == 1)
-    assert(keyFieldFilter.points.length == 0)
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( KEY_FIELD <= 0 AND KEY_FIELD >= 1 )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get2")))
+    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
 
-    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
   }
 
   /**
@@ -211,10 +335,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     val results = df.select("KEY_FIELD").take(10)
     assert(results.length == 5)
 
-
     val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-    assert(executionRules.columnFilterCollection == null)
-    assert(executionRules.requiredQualifierDefinitionArray.length == 0)
+
+    assert(executionRules.dynamicLogicExpression == null)
 
   }
 
@@ -223,38 +346,28 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
    * rowKey and the a column
    */
   test("Test SQL point and range combo") {
-    val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " +
       "WHERE " +
       "(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
       "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
 
     val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
 
-    assert(results.length == 3)
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( KEY_FIELD == 0 AND B_FIELD < 1 ) OR " +
+      "( KEY_FIELD >= 2 AND B_FIELD == 3 ) )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 1)
+    assert(executionRules.rowKeyFilter.ranges.size == 1)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
-    val keyFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
-    assert(keyFieldFilter.ranges.length == 1)
-    assert(keyFieldFilter.ranges.head.upperBound == null)
-    assert(Bytes.toString(keyFieldFilter.ranges.head.lowerBound).equals("get3"))
-    assert(keyFieldFilter.ranges.head.isLowerBoundEqualTo)
-    assert(keyFieldFilter.points.length == 1)
-    assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
-
-    val bFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
-    assert(bFieldFilter.ranges.length == 1)
-    assert(bFieldFilter.ranges.head.lowerBound.length == 0)
-    assert(Bytes.toString(bFieldFilter.ranges.head.upperBound).equals("3"))
-    assert(!bFieldFilter.ranges.head.isUpperBoundEqualTo)
-    assert(bFieldFilter.points.length == 1)
-    assert(Bytes.toString(bFieldFilter.points.head).equals("8"))
-
-    assert(executionRules.requiredQualifierDefinitionArray.length == 1)
-    assert(executionRules.requiredQualifierDefinitionArray.head.columnName.equals("B_FIELD"))
-    assert(executionRules.requiredQualifierDefinitionArray.head.columnFamily.equals("c"))
-    assert(executionRules.requiredQualifierDefinitionArray.head.qualifier.equals("b"))
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get3")))
+    assert(scanRange1.upperBound == null)
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
+
+
+    assert(results.length == 3)
   }
 
   /**
@@ -262,46 +375,145 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
    */
   test("Test two complete range non merge rowKey query") {
 
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
       "WHERE " +
-      "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
-      "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
+      "( KEY_FIELD >= 1 and KEY_FIELD <= 2) or" +
+      "( KEY_FIELD > 3 and KEY_FIELD <= 5)").take(10)
+
 
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
     assert(results.length == 4)
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
+      "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 2)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
-    val keyFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
-    assert(keyFieldFilter.ranges.length == 2)
-    assert(keyFieldFilter.points.length == 0)
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(1)))
+    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
+
+    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+    assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3)))
+    assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5)))
+    assert(!scanRange2.isLowerBoundEqualTo)
+    assert(scanRange2.isUpperBoundEqualTo)
 
-    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
   }
 
   /**
    * A complex query with two complex ranges that does merge into one
    */
   test("Test two complete range merge rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
       "WHERE " +
-      "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
+      "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
       "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
 
     val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
 
+    assert(results.length == 4)
+
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
+      "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 2)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
+    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get2")))
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
+
+    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+    assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3")))
+    assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5")))
+    assert(!scanRange2.isLowerBoundEqualTo)
+    assert(scanRange2.isUpperBoundEqualTo)
+  }
+
+  test("Test OR logic with a one RowKey and One column") {
+
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
+      "WHERE " +
+      "( KEY_FIELD >= 'get1' or A_FIELD <= 'foo2') or" +
+      "( KEY_FIELD > 'get3' or B_FIELD <= '4')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 5)
+
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( KEY_FIELD >= 0 OR A_FIELD <= 1 ) OR " +
+      "( KEY_FIELD > 2 OR B_FIELD <= 3 ) )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    //This is the main test for 14406
+    //Because the key is joined through a or with a qualifier
+    //There is no filter on the rowKey
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+    assert(scanRange1.upperBound == null)
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
+  }
+
+  test("Test OR logic with a two columns") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
+      "WHERE " +
+      "( B_FIELD > '4' or A_FIELD <= 'foo2') or" +
+      "( A_FIELD > 'foo2' or B_FIELD < '4')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
     assert(results.length == 5)
 
-    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
-    val keyFieldFilter =
-      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
-    assert(keyFieldFilter.ranges.length == 1)
-    assert(keyFieldFilter.points.length == 0)
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( ( B_FIELD > 0 OR A_FIELD <= 1 ) OR " +
+      "( A_FIELD > 2 OR B_FIELD < 3 ) )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+    assert(scanRange1.upperBound == null)
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
 
-    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
   }
 
-  test("test table that doesn't exist") {
+  test("Test single RowKey Or Column logic") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
+      "WHERE " +
+      "( KEY_FIELD >= 'get4' or A_FIELD <= 'foo2' )").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 4)
+
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( KEY_FIELD >= 0 OR A_FIELD <= 1 )"))
+
+    assert(executionRules.rowKeyFilter.points.size == 0)
+    assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+    assert(scanRange1.upperBound == null)
+    assert(scanRange1.isLowerBoundEqualTo)
+    assert(scanRange1.isUpperBoundEqualTo)
+  }
+
+
+  test("Test table that doesn't exist") {
     intercept[TableNotFoundException] {
       df = sqlContext.load("org.apache.hadoop.hbase.spark",
         Map("hbase.columns.mapping" ->
@@ -315,6 +527,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
         "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
         "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
     }
+    DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
   }
 
   test("Test table with column that doesn't exist") {
@@ -325,11 +538,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     df.registerTempTable("hbaseFactColumnTmp")
 
-    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
+    val result = sqlContext.sql("SELECT KEY_FIELD, " +
+      "B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
 
     assert(result.count() == 5)
 
-    val localResult = result.take(5)
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(executionRules.dynamicLogicExpression == null)
+
   }
 
   test("Test table with INT column") {
@@ -343,11 +559,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
     " where I_FIELD > 4 and I_FIELD < 10")
 
-    assert(result.count() == 2)
-
-    val localResult = result.take(3)
+    val localResult = result.take(5)
 
+    assert(localResult.length == 2)
     assert(localResult(0).getInt(2) == 8)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(executionRules.dynamicLogicExpression.toExpressionString.
+      equals("( I_FIELD > 0 AND I_FIELD < 1 )"))
+
   }
 
   test("Test table with INT column defined at wrong type") {
@@ -358,11 +578,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     df.registerTempTable("hbaseIntWrongTypeTmp")
 
-    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+    val result = sqlContext.sql("SELECT KEY_FIELD, " +
+      "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
 
-    assert(result.count() == 5)
+    val localResult = result.take(10)
+    assert(localResult.length == 5)
 
-    val localResult = result.take(5)
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(executionRules.dynamicLogicExpression == null)
 
     assert(localResult(0).getString(2).length == 4)
     assert(localResult(0).getString(2).charAt(0).toByte == 0)
@@ -380,9 +603,13 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
       df.registerTempTable("hbaseBadTmp")
 
-      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseBadTmp")
+      val result = sqlContext.sql("SELECT KEY_FIELD, " +
+        "B_FIELD, I_FIELD FROM hbaseBadTmp")
 
-      val localResult = result.take(5)
+      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+      assert(executionRules.dynamicLogicExpression == null)
+
+      result.take(5)
     }
   }
 
@@ -396,11 +623,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
       df.registerTempTable("hbaseIntWrongTypeTmp")
 
-      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+      val result = sqlContext.sql("SELECT KEY_FIELD, " +
+        "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
 
-      assert(result.count() == 5)
+      val localResult = result.take(10)
+      assert(localResult.length == 5)
+
+      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+      assert(executionRules.dynamicLogicExpression == null)
 
-      val localResult = result.take(5)
     }
   }
 
@@ -413,11 +644,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
       df.registerTempTable("hbaseIntWrongTypeTmp")
 
-      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+      val result = sqlContext.sql("SELECT KEY_FIELD, " +
+        "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
 
       assert(result.count() == 5)
 
       val localResult = result.take(5)
+      localResult.length
+
+      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+      assert(executionRules.dynamicLogicExpression == null)
+
+
     }
   }
 
@@ -430,11 +668,16 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
       df.registerTempTable("hbaseIntWrongTypeTmp")
 
-      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, " +
+        "I_FIELD FROM hbaseIntWrongTypeTmp")
+
+      val localResult = result.take(10)
+      assert(localResult.length == 5)
+
+      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+      assert(executionRules.dynamicLogicExpression == null)
 
-      assert(result.count() == 5)
 
-      val localResult = result.take(5)
     }
   }
 
@@ -448,9 +691,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
 
-    assert(result.count() == 5)
-
-    val localResult = result.take(5)
+    val localResult = result.take(10)
+    assert(localResult.length == 5)
 
     assert(localResult(0).getString(2) == null)
     assert(localResult(1).getString(2) == "FOO")
@@ -458,5 +700,27 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(localResult(3).getString(2) == "BAR")
     assert(localResult(4).getString(2) == null)
 
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(executionRules.dynamicLogicExpression == null)
+  }
+
+  test("Test with column logic disabled") {
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,",
+        "hbase.table" -> "t1",
+        "hbase.push.down.column.filter" -> "false"))
+
+    df.registerTempTable("hbaseNoPushDownTmp")
+
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNoPushDownTmp " +
+      "WHERE " +
+      "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 2)
+
+    assert(executionRules.dynamicLogicExpression == null)
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
new file mode 100644
index 0000000..3140ebd
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Logging
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class DynamicLogicExpressionSuite  extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+
+  test("Basic And Test") {
+    val leftLogic = new LessThanLogicExpression("Col1", 0)
+    val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+    val andLogic = new AndLogicExpression(leftLogic, rightLogic)
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
+
+    columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
+    val valueFromQueryValueArray = new Array[Array[Byte]](2)
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    val expressionString = andLogic.toExpressionString
+
+    assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
+
+    val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+  }
+
+  test("Basic OR Test") {
+    val leftLogic = new LessThanLogicExpression("Col1", 0)
+    val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+    val OrLogic = new OrLogicExpression(leftLogic, rightLogic)
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
+
+    columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
+    val valueFromQueryValueArray = new Array[Array[Byte]](2)
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    assert(!OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    val expressionString = OrLogic.toExpressionString
+
+    assert(expressionString.equals("( Col1 < 0 OR Col1 > 1 )"))
+
+    val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(15)
+    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+  }
+
+  test("Basic Command Test") {
+    val greaterLogic = new GreaterThanLogicExpression("Col1", 0)
+    val greaterAndEqualLogic = new GreaterThanOrEqualLogicExpression("Col1", 0)
+    val lessLogic = new LessThanLogicExpression("Col1", 0)
+    val lessAndEqualLogic = new LessThanOrEqualLogicExpression("Col1", 0)
+    val equalLogic = new EqualLogicExpression("Col1", 0, false)
+    val notEqualLogic = new EqualLogicExpression("Col1", 0, true)
+    val passThrough = new PassThroughLogicExpression
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
+    columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
+    val valueFromQueryValueArray = new Array[Array[Byte]](1)
+
+    //great than
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    //great than and equal
+    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
+      valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
+      valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
+      valueFromQueryValueArray))
+
+    //less than
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    //less than and equal
+    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    //equal too
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    assert(!equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    //not equal too
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(!notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    assert(notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    //pass through
+    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+  }
+
+
+}