You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/10/16 20:22:17 UTC
[2/5] hbase git commit: HBASE-14406 The dataframe datasource filter
is wrong, and will result in data loss or unexpected behavior (Ted Malaska)
http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index a180de2..23480bb 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
import org.apache.hadoop.hbase.types._
-import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, PositionedByteRange, Bytes}
+import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange,
+PositionedByteRange, Bytes}
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
@@ -44,7 +45,7 @@ import scala.collection.mutable
* - Type conversions of basic SQL types. All conversions will be
* Through the HBase Bytes object commands.
*/
-class DefaultSource extends RelationProvider {
+class DefaultSource extends RelationProvider with Logging {
val TABLE_KEY:String = "hbase.table"
val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
@@ -52,6 +53,7 @@ class DefaultSource extends RelationProvider {
val CACHING_NUM_KEY:String = "hbase.caching.num"
val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
+ val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
/**
* Is given input from SparkSQL to construct a BaseRelation
@@ -73,6 +75,7 @@ class DefaultSource extends RelationProvider {
val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
+ val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
val batchingNum:Int = try {
batchingNumStr.toInt
@@ -95,7 +98,8 @@ class DefaultSource extends RelationProvider {
batchingNum.toInt,
cachingNum.toInt,
hbaseConfigResources,
- useHBaseReources.equalsIgnoreCase("true"))(sqlContext)
+ useHBaseReources.equalsIgnoreCase("true"),
+ usePushDownColumnFilter.equalsIgnoreCase("true"))(sqlContext)
}
/**
@@ -161,7 +165,8 @@ class HBaseRelation (val tableName:String,
val batchingNum:Int,
val cachingNum:Int,
val configResources:String,
- val useHBaseContext:Boolean) (
+ val useHBaseContext:Boolean,
+ val usePushDownColumnFilter:Boolean) (
@transient val sqlContext:SQLContext)
extends BaseRelation with PrunedFilteredScan with Logging {
@@ -217,151 +222,158 @@ class HBaseRelation (val tableName:String,
*/
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
- val columnFilterCollection = buildColumnFilterCollection(filters)
+ val pushDownTuple = buildPushDownPredicatesResource(filters)
+ val pushDownRowKeyFilter = pushDownTuple._1
+ var pushDownDynamicLogicExpression = pushDownTuple._2
+ val valueArray = pushDownTuple._3
+
+ if (!usePushDownColumnFilter) {
+ pushDownDynamicLogicExpression = null
+ }
+
+ logDebug("pushDownRowKeyFilter: " + pushDownRowKeyFilter.ranges)
+ if (pushDownDynamicLogicExpression != null) {
+ logDebug("pushDownDynamicLogicExpression: " +
+ pushDownDynamicLogicExpression.toExpressionString)
+ }
+ logDebug("valueArray: " + valueArray.length)
+
+ val requiredQualifierDefinitionList =
+ new mutable.MutableList[SchemaQualifierDefinition]
- val requiredQualifierDefinitionArray = new mutable.MutableList[SchemaQualifierDefinition]
requiredColumns.foreach( c => {
val definition = schemaMappingDefinition.get(c)
- if (definition.columnFamilyBytes.length > 0) {
- requiredQualifierDefinitionArray += definition
- }
+ requiredQualifierDefinitionList += definition
})
//Create a local variable so that scala doesn't have to
// serialize the whole HBaseRelation Object
val serializableDefinitionMap = schemaMappingDefinition
-
//retain the information for unit testing checks
- DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection,
- requiredQualifierDefinitionArray)
+ DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
+ pushDownDynamicLogicExpression)
var resultRDD: RDD[Row] = null
- if (columnFilterCollection != null) {
- val pushDownFilterJava =
- new SparkSQLPushDownFilter(
- columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition))
-
- val getList = new util.ArrayList[Get]()
- val rddList = new util.ArrayList[RDD[Row]]()
-
- val it = columnFilterCollection.columnFilterMap.iterator
- while (it.hasNext) {
- val e = it.next()
- val columnDefinition = schemaMappingDefinition.get(e._1)
- //check is a rowKey
- if (columnDefinition != null && columnDefinition.columnFamily.isEmpty) {
- //add points to getList
- e._2.points.foreach(p => {
- val get = new Get(p)
- requiredQualifierDefinitionArray.foreach( d =>
- get.addColumn(d.columnFamilyBytes, d.qualifierBytes))
- getList.add(get)
- })
+ val getList = new util.ArrayList[Get]()
+ val rddList = new util.ArrayList[RDD[Row]]()
- val rangeIt = e._2.ranges.iterator
+ //add points to getList
+ pushDownRowKeyFilter.points.foreach(p => {
+ val get = new Get(p)
+ requiredQualifierDefinitionList.foreach( d => {
+ if (d.columnFamilyBytes.length > 0)
+ get.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+ })
+ getList.add(get)
+ })
- while (rangeIt.hasNext) {
- val r = rangeIt.next()
+ val rangeIt = pushDownRowKeyFilter.ranges.iterator
- val scan = new Scan()
- scan.setBatch(batchingNum)
- scan.setCaching(cachingNum)
- requiredQualifierDefinitionArray.foreach( d =>
- scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
+ while (rangeIt.hasNext) {
+ val r = rangeIt.next()
- if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 0) {
- scan.setFilter(pushDownFilterJava)
- }
+ val scan = new Scan()
+ scan.setBatch(batchingNum)
+ scan.setCaching(cachingNum)
+ requiredQualifierDefinitionList.foreach( d =>
+ if (d.columnFamilyBytes.length > 0)
+ scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
- //Check if there is a lower bound
- if (r.lowerBound != null && r.lowerBound.length > 0) {
-
- if (r.isLowerBoundEqualTo) {
- //HBase startRow is inclusive: Therefore it acts like isLowerBoundEqualTo
- // by default
- scan.setStartRow(r.lowerBound)
- } else {
- //Since we don't equalTo we want the next value we need
- // to add another byte to the start key. That new byte will be
- // the min byte value.
- val newArray = new Array[Byte](r.lowerBound.length + 1)
- System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
-
- //new Min Byte
- newArray(r.lowerBound.length) = Byte.MinValue
- scan.setStartRow(newArray)
- }
- }
+ if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
+ val pushDownFilterJava =
+ new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
+ valueArray, requiredQualifierDefinitionList)
- //Check if there is a upperBound
- if (r.upperBound != null && r.upperBound.length > 0) {
- if (r.isUpperBoundEqualTo) {
- //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
- // by default. So we need to add a new max byte to the stopRow key
- val newArray = new Array[Byte](r.upperBound.length + 1)
- System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
-
- //New Max Bytes
- newArray(r.upperBound.length) = Byte.MaxValue
-
- scan.setStopRow(newArray)
- } else {
- //Here equalTo is false for Upper bound which is exclusive and
- // HBase stopRow acts like that by default so no need to mutate the
- // rowKey
- scan.setStopRow(r.upperBound)
- }
- }
+ scan.setFilter(pushDownFilterJava)
+ }
- val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
- Row.fromSeq(requiredColumns.map(c =>
- DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
- })
- rddList.add(rdd)
- }
+ //Check if there is a lower bound
+ if (r.lowerBound != null && r.lowerBound.length > 0) {
+
+ if (r.isLowerBoundEqualTo) {
+ //HBase startRow is inclusive: Therefore it acts like isLowerBoundEqualTo
+ // by default
+ scan.setStartRow(r.lowerBound)
+ } else {
+ //Since we don't equalTo we want the next value we need
+ // to add another byte to the start key. That new byte will be
+ // the min byte value.
+ val newArray = new Array[Byte](r.lowerBound.length + 1)
+ System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
+
+ //new Min Byte
+ newArray(r.lowerBound.length) = Byte.MinValue
+ scan.setStartRow(newArray)
}
}
- //If there is more then one RDD then we have to union them together
- for (i <- 0 until rddList.size()) {
- if (resultRDD == null) resultRDD = rddList.get(i)
- else resultRDD = resultRDD.union(rddList.get(i))
-
+ //Check if there is a upperBound
+ if (r.upperBound != null && r.upperBound.length > 0) {
+ if (r.isUpperBoundEqualTo) {
+ //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
+ // by default. So we need to add a new max byte to the stopRow key
+ val newArray = new Array[Byte](r.upperBound.length + 1)
+ System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
+
+ //New Max Bytes
+ newArray(r.upperBound.length) = Byte.MaxValue
+ scan.setStopRow(newArray)
+ } else {
+ //Here equalTo is false for Upper bound which is exclusive and
+ // HBase stopRow acts like that by default so no need to mutate the
+ // rowKey
+ scan.setStopRow(r.upperBound)
+ }
}
- //If there are gets then we can get them from the driver and union that rdd in
- // with the rest of the values.
- if (getList.size() > 0) {
- val connection = ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
+ val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
+ Row.fromSeq(requiredColumns.map(c =>
+ DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
+ })
+ rddList.add(rdd)
+ }
+
+ //If there is more then one RDD then we have to union them together
+ for (i <- 0 until rddList.size()) {
+ if (resultRDD == null) resultRDD = rddList.get(i)
+ else resultRDD = resultRDD.union(rddList.get(i))
+
+ }
+
+ //If there are gets then we can get them from the driver and union that rdd in
+ // with the rest of the values.
+ if (getList.size() > 0) {
+ val connection =
+ ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
+ try {
+ val table = connection.getTable(TableName.valueOf(tableName))
try {
- val table = connection.getTable(TableName.valueOf(tableName))
- try {
- val results = table.get(getList)
- val rowList = mutable.MutableList[Row]()
- for (i <- 0 until results.length) {
- val rowArray = requiredColumns.map(c =>
- DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
- rowList += Row.fromSeq(rowArray)
- }
- val getRDD = sqlContext.sparkContext.parallelize(rowList)
- if (resultRDD == null) resultRDD = getRDD
- else {
- resultRDD = resultRDD.union(getRDD)
- }
- } finally {
- table.close()
+ val results = table.get(getList)
+ val rowList = mutable.MutableList[Row]()
+ for (i <- 0 until results.length) {
+ val rowArray = requiredColumns.map(c =>
+ DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
+ rowList += Row.fromSeq(rowArray)
+ }
+ val getRDD = sqlContext.sparkContext.parallelize(rowList)
+ if (resultRDD == null) resultRDD = getRDD
+ else {
+ resultRDD = resultRDD.union(getRDD)
}
} finally {
- connection.close()
+ table.close()
}
+ } finally {
+ connection.close()
}
}
+
if (resultRDD == null) {
val scan = new Scan()
scan.setBatch(batchingNum)
scan.setCaching(cachingNum)
- requiredQualifierDefinitionArray.foreach( d =>
+ requiredQualifierDefinitionList.foreach( d =>
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
@@ -373,79 +385,135 @@ class HBaseRelation (val tableName:String,
resultRDD
}
- /**
- * Root recursive function that will loop over the filters provided by
- * SparkSQL. Some filters are AND or OR functions and contain additional filters
- * hence the need for recursion.
- *
- * @param filters Filters provided by SparkSQL.
- * Filters are joined with the AND operater
- * @return A ColumnFilterCollection whish is a consolidated construct to
- * hold the high level filter information
- */
- def buildColumnFilterCollection(filters: Array[Filter]): ColumnFilterCollection = {
- var superCollection: ColumnFilterCollection = null
+ def buildPushDownPredicatesResource(filters: Array[Filter]):
+ (RowKeyFilter, DynamicLogicExpression, Array[Array[Byte]]) = {
+ var superRowKeyFilter:RowKeyFilter = null
+ val queryValueList = new mutable.MutableList[Array[Byte]]
+ var superDynamicLogicExpression: DynamicLogicExpression = null
filters.foreach( f => {
- val parentCollection = new ColumnFilterCollection
- buildColumnFilterCollection(parentCollection, f)
- if (superCollection == null)
- superCollection = parentCollection
- else
- superCollection.mergeIntersect(parentCollection)
+ val rowKeyFilter = new RowKeyFilter()
+ val logicExpression = transverseFilterTree(rowKeyFilter, queryValueList, f)
+ if (superDynamicLogicExpression == null) {
+ superDynamicLogicExpression = logicExpression
+ superRowKeyFilter = rowKeyFilter
+ } else {
+ superDynamicLogicExpression =
+ new AndLogicExpression(superDynamicLogicExpression, logicExpression)
+ superRowKeyFilter.mergeIntersect(rowKeyFilter)
+ }
+
})
- superCollection
+
+ val queryValueArray = queryValueList.toArray
+
+ if (superRowKeyFilter == null) {
+ superRowKeyFilter = new RowKeyFilter
+ }
+
+ (superRowKeyFilter, superDynamicLogicExpression, queryValueArray)
}
- /**
- * Recursive function that will work to convert Spark Filter
- * objects to ColumnFilterCollection
- *
- * @param parentFilterCollection Parent ColumnFilterCollection
- * @param filter Current given filter from SparkSQL
- */
- def buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection,
- filter:Filter): Unit = {
+ def transverseFilterTree(parentRowKeyFilter:RowKeyFilter,
+ valueArray:mutable.MutableList[Array[Byte]],
+ filter:Filter): DynamicLogicExpression = {
filter match {
case EqualTo(attr, value) =>
- parentFilterCollection.mergeUnion(attr,
- new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr,
- schemaMappingDefinition, value.toString)))
-
+ val columnDefinition = schemaMappingDefinition.get(attr)
+ if (columnDefinition != null) {
+ if (columnDefinition.columnFamily.isEmpty) {
+ parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
+ DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString), null))
+ }
+ val byteValue =
+ DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString)
+ valueArray += byteValue
+ }
+ new EqualLogicExpression(attr, valueArray.length - 1, false)
case LessThan(attr, value) =>
- parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
- new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
- schemaMappingDefinition, value.toString), false,
- new Array[Byte](0), true)))
-
+ val columnDefinition = schemaMappingDefinition.get(attr)
+ if (columnDefinition != null) {
+ if (columnDefinition.columnFamily.isEmpty) {
+ parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+ new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString), false,
+ new Array[Byte](0), true)))
+ }
+ val byteValue =
+ DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString)
+ valueArray += byteValue
+ }
+ new LessThanLogicExpression(attr, valueArray.length - 1)
case GreaterThan(attr, value) =>
- parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
- new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
- schemaMappingDefinition, value.toString), false)))
-
+ val columnDefinition = schemaMappingDefinition.get(attr)
+ if (columnDefinition != null) {
+ if (columnDefinition.columnFamily.isEmpty) {
+ parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+ new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString), false)))
+ }
+ val byteValue =
+ DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString)
+ valueArray += byteValue
+ }
+ new GreaterThanLogicExpression(attr, valueArray.length - 1)
case LessThanOrEqual(attr, value) =>
- parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
- new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
- schemaMappingDefinition, value.toString), true,
- new Array[Byte](0), true)))
-
+ val columnDefinition = schemaMappingDefinition.get(attr)
+ if (columnDefinition != null) {
+ if (columnDefinition.columnFamily.isEmpty) {
+ parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+ new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString), true,
+ new Array[Byte](0), true)))
+ }
+ val byteValue =
+ DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString)
+ valueArray += byteValue
+ }
+ new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
case GreaterThanOrEqual(attr, value) =>
- parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
- new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
- schemaMappingDefinition, value.toString), true)))
+ val columnDefinition = schemaMappingDefinition.get(attr)
+ if (columnDefinition != null) {
+ if (columnDefinition.columnFamily.isEmpty) {
+ parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
+ new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString), true)))
+ }
+ val byteValue =
+ DefaultSourceStaticUtils.getByteValue(attr,
+ schemaMappingDefinition, value.toString)
+ valueArray += byteValue
+ }
+ new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
case Or(left, right) =>
- buildColumnFilterCollection(parentFilterCollection, left)
- val rightSideCollection = new ColumnFilterCollection
- buildColumnFilterCollection(rightSideCollection, right)
- parentFilterCollection.mergeUnion(rightSideCollection)
+ val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
+ val rightSideRowKeyFilter = new RowKeyFilter
+ val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
+
+ parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)
+
+ new OrLogicExpression(leftExpression, rightExpression)
case And(left, right) =>
- buildColumnFilterCollection(parentFilterCollection, left)
- val rightSideCollection = new ColumnFilterCollection
- buildColumnFilterCollection(rightSideCollection, right)
- parentFilterCollection.mergeIntersect(rightSideCollection)
- case _ => //nothing
+
+ val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
+ val rightSideRowKeyFilter = new RowKeyFilter
+ val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
+ parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)
+
+ new AndLogicExpression(leftExpression, rightExpression)
+ case IsNull(attr) =>
+ new IsNullLogicExpression(attr, false)
+ case IsNotNull(attr) =>
+ new IsNullLogicExpression(attr, true)
+ case _ =>
+ new PassThroughLogicExpression
}
}
}
@@ -472,7 +540,7 @@ case class SchemaQualifierDefinition(columnName:String,
else if (colType.equals("DOUBLE")) DoubleType
else if (colType.equals("STRING")) StringType
else if (colType.equals("TIMESTAMP")) TimestampType
- else if (colType.equals("DECIMAL")) StringType //DataTypes.createDecimalType(precision, scale)
+ else if (colType.equals("DECIMAL")) StringType
else throw new IllegalArgumentException("Unsupported column type :" + colType)
}
@@ -524,11 +592,11 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
isLowerBoundEqualTo = if (lowerBoundCompare == 0)
isLowerBoundEqualTo || other.isLowerBoundEqualTo
- else isLowerBoundEqualTo
+ else if (lowerBoundCompare < 0) isLowerBoundEqualTo else other.isLowerBoundEqualTo
isUpperBoundEqualTo = if (upperBoundCompare == 0)
isUpperBoundEqualTo || other.isUpperBoundEqualTo
- else isUpperBoundEqualTo
+ else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo
}
/**
@@ -551,14 +619,15 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
* @param other Other scan object
* @return True is overlap false is not overlap
*/
- def doesOverLap(other:ScanRange): Boolean = {
+ def getOverLapScanRange(other:ScanRange): ScanRange = {
var leftRange:ScanRange = null
var rightRange:ScanRange = null
//First identify the Left range
// Also lower bound can't be null
- if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) {
+ if (compareRange(lowerBound, other.lowerBound) < 0 ||
+ compareRange(upperBound, other.upperBound) < 0) {
leftRange = this
rightRange = other
} else {
@@ -568,8 +637,12 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
//Then see if leftRange goes to null or if leftRange.upperBound
// upper is greater or equals to rightRange.lowerBound
- leftRange.upperBound == null ||
- Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0
+ if (leftRange.upperBound == null ||
+ Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0) {
+ new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo, rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
+ } else {
+ null
+ }
}
/**
@@ -586,9 +659,25 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
else if (left != null && right == null) -1
else Bytes.compareTo(left, right)
}
+
+ /**
+ *
+ * @return
+ */
+ def containsPoint(point:Array[Byte]): Boolean = {
+ val lowerCompare = compareRange(point, lowerBound)
+ val upperCompare = compareRange(point, upperBound)
+
+ ((isLowerBoundEqualTo && lowerCompare >= 0) ||
+ (!isLowerBoundEqualTo && lowerCompare > 0)) &&
+ ((isUpperBoundEqualTo && upperCompare <= 0) ||
+ (!isUpperBoundEqualTo && upperCompare < 0))
+
+ }
override def toString:String = {
- "ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + "," +
- Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")"
+ "ScanRange:(upperBound:" + Bytes.toString(upperBound) +
+ ",isUpperBoundEqualTo:" + isUpperBoundEqualTo + ",lowerBound:" +
+ Bytes.toString(lowerBound) + ",isLowerBoundEqualTo:" + isLowerBoundEqualTo + ")"
}
}
@@ -663,7 +752,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null,
other.ranges.foreach( otherR => {
var doesOverLap = false
ranges.foreach{ r =>
- if (r.doesOverLap(otherR)) {
+ if (r.getOverLapScanRange(otherR) != null) {
r.mergeUnion(otherR)
doesOverLap = true
}}
@@ -692,7 +781,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null,
other.ranges.foreach( otherR => {
ranges.foreach( r => {
- if (r.doesOverLap(otherR)) {
+ if (r.getOverLapScanRange(otherR) != null) {
r.mergeIntersect(otherR)
survivingRanges += r
}
@@ -842,7 +931,8 @@ object DefaultSourceStaticUtils {
getFreshByteRange(bytes, 0, bytes.length)
}
- def getFreshByteRange(bytes:Array[Byte], offset:Int = 0, length:Int): PositionedByteRange = {
+ def getFreshByteRange(bytes:Array[Byte], offset:Int = 0, length:Int):
+ PositionedByteRange = {
byteRange.get().set(bytes).setLength(length).setOffset(offset)
}
@@ -856,14 +946,13 @@ object DefaultSourceStaticUtils {
* This method is to populate the lastFiveExecutionRules for unit test perposes
* This method is not thread safe.
*
- * @param columnFilterCollection The filters in the last job
- * @param requiredQualifierDefinitionArray The required columns in the last job
+ * @param rowKeyFilter The rowKey Filter logic used in the last query
+ * @param dynamicLogicExpression The dynamicLogicExpression used in the last query
*/
- def populateLatestExecutionRules(columnFilterCollection: ColumnFilterCollection,
- requiredQualifierDefinitionArray:
- mutable.MutableList[SchemaQualifierDefinition]):Unit = {
+ def populateLatestExecutionRules(rowKeyFilter: RowKeyFilter,
+ dynamicLogicExpression: DynamicLogicExpression):Unit = {
lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
- columnFilterCollection, requiredQualifierDefinitionArray))
+ rowKeyFilter, dynamicLogicExpression))
while (lastFiveExecutionRules.size() > 5) {
lastFiveExecutionRules.poll()
}
@@ -977,6 +1066,162 @@ object DefaultSourceStaticUtils {
}
}
-class ExecutionRuleForUnitTesting(val columnFilterCollection: ColumnFilterCollection,
- val requiredQualifierDefinitionArray:
- mutable.MutableList[SchemaQualifierDefinition])
+/**
+ * Contains information related to a filters for a given column.
+ * This can contain many ranges or points.
+ *
+ * @param currentPoint the initial point when the filter is created
+ * @param currentRange the initial scanRange when the filter is created
+ */
+class RowKeyFilter (currentPoint:Array[Byte] = null,
+ currentRange:ScanRange =
+ new ScanRange(null, true, new Array[Byte](0), true),
+ var points:mutable.MutableList[Array[Byte]] =
+ new mutable.MutableList[Array[Byte]](),
+ var ranges:mutable.MutableList[ScanRange] =
+ new mutable.MutableList[ScanRange]() ) extends Serializable {
+ //Collection of ranges
+ if (currentRange != null ) ranges.+=(currentRange)
+
+ //Collection of points
+ if (currentPoint != null) points.+=(currentPoint)
+
+ /**
+ * This will validate a give value through the filter's points and/or ranges
+ * the result will be if the value passed the filter
+ *
+ * @param value Value to be validated
+ * @param valueOffSet The offset of the value
+ * @param valueLength The length of the value
+ * @return True is the value passes the filter false if not
+ */
+ def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
+ var result = false
+
+ points.foreach( p => {
+ if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
+ result = true
+ }
+ })
+
+ ranges.foreach( r => {
+ val upperBoundPass = r.upperBound == null ||
+ (r.isUpperBoundEqualTo &&
+ Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
+ value, valueOffSet, valueLength) >= 0) ||
+ (!r.isUpperBoundEqualTo &&
+ Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
+ value, valueOffSet, valueLength) > 0)
+
+ val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
+ (r.isLowerBoundEqualTo &&
+ Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
+ value, valueOffSet, valueLength) <= 0) ||
+ (!r.isLowerBoundEqualTo &&
+ Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
+ value, valueOffSet, valueLength) < 0)
+
+ result = result || (upperBoundPass && lowerBoundPass)
+ })
+ result
+ }
+
+ /**
+ * This will allow us to merge filter logic that is joined to the existing filter
+ * through a OR operator
+ *
+ * @param other Filter to merge
+ */
+ def mergeUnion(other:RowKeyFilter): Unit = {
+ other.points.foreach( p => points += p)
+
+ other.ranges.foreach( otherR => {
+ var doesOverLap = false
+ ranges.foreach{ r =>
+ if (r.getOverLapScanRange(otherR) != null) {
+ r.mergeUnion(otherR)
+ doesOverLap = true
+ }}
+ if (!doesOverLap) ranges.+=(otherR)
+ })
+ }
+
+ /**
+ * This will allow us to merge filter logic that is joined to the existing filter
+ * through a AND operator
+ *
+ * @param other Filter to merge
+ */
+ def mergeIntersect(other:RowKeyFilter): Unit = {
+ val survivingPoints = new mutable.MutableList[Array[Byte]]()
+ val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]()
+ if (points == null || points.length == 0) {
+ other.points.foreach( otherP => {
+ didntSurviveFirstPassPoints += otherP
+ })
+ } else {
+ points.foreach(p => {
+ if (other.points.length == 0) {
+ didntSurviveFirstPassPoints += p
+ } else {
+ other.points.foreach(otherP => {
+ if (Bytes.equals(p, otherP)) {
+ survivingPoints += p
+ } else {
+ didntSurviveFirstPassPoints += p
+ }
+ })
+ }
+ })
+ }
+
+ val survivingRanges = new mutable.MutableList[ScanRange]()
+
+ if (ranges.length == 0) {
+ didntSurviveFirstPassPoints.foreach(p => {
+ survivingPoints += p
+ })
+ } else {
+ ranges.foreach(r => {
+ other.ranges.foreach(otherR => {
+ val overLapScanRange = r.getOverLapScanRange(otherR)
+ if (overLapScanRange != null) {
+ survivingRanges += overLapScanRange
+ }
+ })
+ didntSurviveFirstPassPoints.foreach(p => {
+ if (r.containsPoint(p)) {
+ survivingPoints += p
+ }
+ })
+ })
+ }
+ points = survivingPoints
+ ranges = survivingRanges
+ }
+
+ override def toString:String = {
+ val strBuilder = new StringBuilder
+ strBuilder.append("(points:(")
+ var isFirst = true
+ points.foreach( p => {
+ if (isFirst) isFirst = false
+ else strBuilder.append(",")
+ strBuilder.append(Bytes.toString(p))
+ })
+ strBuilder.append("),ranges:")
+ isFirst = true
+ ranges.foreach( r => {
+ if (isFirst) isFirst = false
+ else strBuilder.append(",")
+ strBuilder.append(r)
+ })
+ strBuilder.append("))")
+ strBuilder.toString()
+ }
+}
+
+
+
+class ExecutionRuleForUnitTesting(val rowKeyFilter: RowKeyFilter,
+ val dynamicLogicExpression: DynamicLogicExpression)
http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
new file mode 100644
index 0000000..fa61860
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Dynamic logic for SQL push down logic there is an instance for most
+ * common operations and a pass through for other operations not covered here
+ *
+ * Logic can be nested with And or Or operators.
+ *
+ * A logic tree can be written out as a string and reconstructed from that string
+ *
+ */
+trait DynamicLogicExpression {
+ def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean
+ def toExpressionString: String = {
+ val strBuilder = new StringBuilder
+ appendToExpression(strBuilder)
+ strBuilder.toString()
+ }
+ def appendToExpression(strBuilder:StringBuilder)
+}
+
+class AndLogicExpression (val leftExpression:DynamicLogicExpression,
+ val rightExpression:DynamicLogicExpression)
+ extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) &&
+ rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
+ }
+
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append("( ")
+ strBuilder.append(leftExpression.toExpressionString)
+ strBuilder.append(" AND ")
+ strBuilder.append(rightExpression.toExpressionString)
+ strBuilder.append(" )")
+ }
+}
+
+class OrLogicExpression (val leftExpression:DynamicLogicExpression,
+ val rightExpression:DynamicLogicExpression)
+ extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ leftExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray) ||
+ rightExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray)
+ }
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append("( ")
+ strBuilder.append(leftExpression.toExpressionString)
+ strBuilder.append(" OR ")
+ strBuilder.append(rightExpression.toExpressionString)
+ strBuilder.append(" )")
+ }
+}
+
+class EqualLogicExpression (val columnName:String,
+ val valueFromQueryIndex:Int,
+ val isNot:Boolean) extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+ val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+ currentRowValue != null &&
+ Bytes.equals(valueFromQuery,
+ 0, valueFromQuery.length, currentRowValue.bytes,
+ currentRowValue.offset, currentRowValue.length) != isNot
+ }
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ val command = if (isNot) "!=" else "=="
+ strBuilder.append(columnName + " " + command + " " + valueFromQueryIndex)
+ }
+}
+
+class IsNullLogicExpression (val columnName:String,
+ val isNot:Boolean) extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+
+ (currentRowValue == null) != isNot
+ }
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ val command = if (isNot) "isNotNull" else "isNull"
+ strBuilder.append(columnName + " " + command)
+ }
+}
+
+class GreaterThanLogicExpression (val columnName:String,
+ val valueFromQueryIndex:Int)
+ extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+ val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+ currentRowValue != null &&
+ Bytes.compareTo(currentRowValue.bytes,
+ currentRowValue.offset, currentRowValue.length, valueFromQuery,
+ 0, valueFromQuery.length) > 0
+ }
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append(columnName + " > " + valueFromQueryIndex)
+ }
+}
+
+class GreaterThanOrEqualLogicExpression (val columnName:String,
+ val valueFromQueryIndex:Int)
+ extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+ val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+ currentRowValue != null &&
+ Bytes.compareTo(currentRowValue.bytes,
+ currentRowValue.offset, currentRowValue.length, valueFromQuery,
+ 0, valueFromQuery.length) >= 0
+ }
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append(columnName + " >= " + valueFromQueryIndex)
+ }
+}
+
+class LessThanLogicExpression (val columnName:String,
+ val valueFromQueryIndex:Int)
+ extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+ val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+ currentRowValue != null &&
+ Bytes.compareTo(currentRowValue.bytes,
+ currentRowValue.offset, currentRowValue.length, valueFromQuery,
+ 0, valueFromQuery.length) < 0
+ }
+
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append(columnName + " < " + valueFromQueryIndex)
+ }
+}
+
+class LessThanOrEqualLogicExpression (val columnName:String,
+ val valueFromQueryIndex:Int)
+ extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+ val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+ currentRowValue != null &&
+ Bytes.compareTo(currentRowValue.bytes,
+ currentRowValue.offset, currentRowValue.length, valueFromQuery,
+ 0, valueFromQuery.length) <= 0
+ }
+
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append(columnName + " <= " + valueFromQueryIndex)
+ }
+}
+
+class PassThroughLogicExpression() extends DynamicLogicExpression {
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray: Array[Array[Byte]]): Boolean = true
+
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append("Pass")
+ }
+}
+
+object DynamicLogicExpressionBuilder {
+ def build(expressionString:String): DynamicLogicExpression = {
+
+ val expressionAndOffset = build(expressionString.split(' '), 0)
+ expressionAndOffset._1
+ }
+
+ private def build(expressionArray:Array[String],
+ offSet:Int): (DynamicLogicExpression, Int) = {
+ if (expressionArray(offSet).equals("(")) {
+ val left = build(expressionArray, offSet + 1)
+ val right = build(expressionArray, left._2 + 1)
+ if (expressionArray(left._2).equals("AND")) {
+ (new AndLogicExpression(left._1, right._1), right._2 + 1)
+ } else if (expressionArray(left._2).equals("OR")) {
+ (new OrLogicExpression(left._1, right._1), right._2 + 1)
+ } else {
+ throw new Throwable("Unknown gate:" + expressionArray(left._2))
+ }
+ } else {
+ val command = expressionArray(offSet + 1)
+ if (command.equals("<")) {
+ (new LessThanLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt), offSet + 3)
+ } else if (command.equals("<=")) {
+ (new LessThanOrEqualLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt), offSet + 3)
+ } else if (command.equals(">")) {
+ (new GreaterThanLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt), offSet + 3)
+ } else if (command.equals(">=")) {
+ (new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt), offSet + 3)
+ } else if (command.equals("==")) {
+ (new EqualLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt, false), offSet + 3)
+ } else if (command.equals("!=")) {
+ (new EqualLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt, true), offSet + 3)
+ } else if (command.equals("isNull")) {
+ (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
+ } else if (command.equals("isNotNull")) {
+ (new IsNullLogicExpression(expressionArray(offSet), true), offSet + 2)
+ } else if (command.equals("Pass")) {
+ (new PassThroughLogicExpression, offSet + 2)
+ } else {
+ throw new Throwable("Unknown logic command:" + command)
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index fb475b0..2cee3a8 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -29,7 +29,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
- val tableName = "t1"
+ val t1TableName = "t1"
+ val t2TableName = "t2"
val columnFamily = "c"
var sqlContext:SQLContext = null
@@ -41,50 +42,94 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
logInfo(" - minicluster started")
try
- TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
catch {
- case e: Exception => logInfo(" - no table " + tableName + " found")
-
+ case e: Exception => logInfo(" - no table " + t1TableName + " found")
}
- logInfo(" - creating table " + tableName)
- TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+ try
+ TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
+ catch {
+ case e: Exception => logInfo(" - no table " + t2TableName + " found")
+ }
+ logInfo(" - creating table " + t1TableName)
+ TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
+ logInfo(" - created table")
+ logInfo(" - creating table " + t2TableName)
+ TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
sc = new SparkContext("local", "test")
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
- val table = connection.getTable(TableName.valueOf("t1"))
-
try {
- var put = new Put(Bytes.toBytes("get1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
- table.put(put)
- put = new Put(Bytes.toBytes("get2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
- table.put(put)
- put = new Put(Bytes.toBytes("get3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
- table.put(put)
- put = new Put(Bytes.toBytes("get4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
- table.put(put)
- put = new Put(Bytes.toBytes("get5"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
- table.put(put)
+ val t1Table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ var put = new Put(Bytes.toBytes("get1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
+ t1Table.put(put)
+ put = new Put(Bytes.toBytes("get2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
+ t1Table.put(put)
+ put = new Put(Bytes.toBytes("get3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+ t1Table.put(put)
+ put = new Put(Bytes.toBytes("get4"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
+ t1Table.put(put)
+ put = new Put(Bytes.toBytes("get5"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+ t1Table.put(put)
+ } finally {
+ t1Table.close()
+ }
+
+ val t2Table = connection.getTable(TableName.valueOf("t2"))
+
+ try {
+ var put = new Put(Bytes.toBytes(1))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
+ t2Table.put(put)
+ put = new Put(Bytes.toBytes(2))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
+ t2Table.put(put)
+ put = new Put(Bytes.toBytes(3))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+ t2Table.put(put)
+ put = new Put(Bytes.toBytes(4))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
+ t2Table.put(put)
+ put = new Put(Bytes.toBytes(5))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+ t2Table.put(put)
+ } finally {
+ t2Table.close()
+ }
} finally {
- table.close()
connection.close()
}
@@ -98,23 +143,36 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
"hbase.batching.num" -> "100",
"cachingNum" -> "100"))
- df.registerTempTable("hbaseTmp")
+ df.registerTempTable("hbaseTable1")
+
+ df = sqlContext.load("org.apache.hadoop.hbase.spark",
+ Map("hbase.columns.mapping" ->
+ "KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
+ "hbase.table" -> "t2",
+ "hbase.batching.num" -> "100",
+ "cachingNum" -> "100"))
+
+ df.registerTempTable("hbaseTable2")
}
override def afterAll() {
- TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()
sc.stop()
}
+ override def beforeEach(): Unit = {
+ DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
+ }
+
/**
* A example of query three fields and also only using rowkey points for the filter
*/
test("Test rowKey point only rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
@@ -122,23 +180,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 3)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
- val keyFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
- assert(keyFieldFilter.ranges.length == 0)
- assert(keyFieldFilter.points.length == 3)
- assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
- assert(Bytes.toString(keyFieldFilter.points(1)).equals("get2"))
- assert(Bytes.toString(keyFieldFilter.points(2)).equals("get3"))
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( KEY_FIELD == 0 OR KEY_FIELD == 1 ) OR KEY_FIELD == 2 )"))
- assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+ assert(executionRules.rowKeyFilter.points.size == 3)
+ assert(executionRules.rowKeyFilter.ranges.size == 0)
}
/**
* A example of query three fields and also only using cell points for the filter
*/
test("Test cell point only rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
@@ -146,17 +199,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 3)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
- val bFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
- assert(bFieldFilter.ranges.length == 0)
- assert(bFieldFilter.points.length == 2)
- val aFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("A_FIELD").get
- assert(aFieldFilter.ranges.length == 0)
- assert(aFieldFilter.points.length == 1)
-
- assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( B_FIELD == 0 OR B_FIELD == 1 ) OR A_FIELD == 2 )"))
}
/**
@@ -164,7 +208,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
* Also an example of less then and greater then
*/
test("Test two range rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
@@ -172,13 +216,88 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 3)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
- val keyFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
- assert(keyFieldFilter.ranges.length == 2)
- assert(keyFieldFilter.points.length == 0)
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 2)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+ assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2")))
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(!scanRange1.isUpperBoundEqualTo)
+
+ val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+ assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3")))
+ assert(scanRange2.upperBound == null)
+ assert(!scanRange2.isLowerBoundEqualTo)
+ assert(scanRange2.isUpperBoundEqualTo)
+ }
+
+ /**
+ * A example of a OR merge between to ranges the result is one range
+ * Also an example of less then and greater then
+ *
+ * This example makes sure the code works for a int rowKey
+ */
+ test("Test two range rowKey query where the rowKey is Int and there is a range over lap") {
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
+ "WHERE " +
+ "( KEY_FIELD < 4 or KEY_FIELD > 2)").take(10)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+
+
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+ assert(scanRange1.upperBound == null)
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
+
+ assert(results.length == 5)
+ }
+
+ /**
+ * A example of a OR merge between to ranges the result is two ranges
+ * Also an example of less then and greater then
+ *
+ * This example makes sure the code works for a int rowKey
+ */
+ test("Test two range rowKey query where the rowKey is Int and the ranges don't over lap") {
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
+ "WHERE " +
+ "( KEY_FIELD < 2 or KEY_FIELD > 4)").take(10)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+
+ assert(executionRules.rowKeyFilter.ranges.size == 2)
- assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+ assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(!scanRange1.isUpperBoundEqualTo)
+
+ val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+ assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes(4)))
+ assert(scanRange2.upperBound == null)
+ assert(!scanRange2.isLowerBoundEqualTo)
+ assert(scanRange2.isUpperBoundEqualTo)
+
+ assert(results.length == 2)
}
/**
@@ -186,7 +305,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
* Also an example of less then and equal to and greater then and equal to
*/
test("Test one combined range rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
@@ -194,13 +313,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(results.length == 2)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
- val keyFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
- assert(keyFieldFilter.ranges.length == 1)
- assert(keyFieldFilter.points.length == 0)
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( KEY_FIELD <= 0 AND KEY_FIELD >= 1 )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get2")))
+ assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
- assert(executionRules.requiredQualifierDefinitionArray.length == 2)
}
/**
@@ -211,10 +335,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val results = df.select("KEY_FIELD").take(10)
assert(results.length == 5)
-
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(executionRules.columnFilterCollection == null)
- assert(executionRules.requiredQualifierDefinitionArray.length == 0)
+
+ assert(executionRules.dynamicLogicExpression == null)
}
@@ -223,38 +346,28 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
* rowKey and the a column
*/
test("Test SQL point and range combo") {
- val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " +
"WHERE " +
"(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
"(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(results.length == 3)
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( KEY_FIELD == 0 AND B_FIELD < 1 ) OR " +
+ "( KEY_FIELD >= 2 AND B_FIELD == 3 ) )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 1)
+ assert(executionRules.rowKeyFilter.ranges.size == 1)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
- val keyFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
- assert(keyFieldFilter.ranges.length == 1)
- assert(keyFieldFilter.ranges.head.upperBound == null)
- assert(Bytes.toString(keyFieldFilter.ranges.head.lowerBound).equals("get3"))
- assert(keyFieldFilter.ranges.head.isLowerBoundEqualTo)
- assert(keyFieldFilter.points.length == 1)
- assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
-
- val bFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
- assert(bFieldFilter.ranges.length == 1)
- assert(bFieldFilter.ranges.head.lowerBound.length == 0)
- assert(Bytes.toString(bFieldFilter.ranges.head.upperBound).equals("3"))
- assert(!bFieldFilter.ranges.head.isUpperBoundEqualTo)
- assert(bFieldFilter.points.length == 1)
- assert(Bytes.toString(bFieldFilter.points.head).equals("8"))
-
- assert(executionRules.requiredQualifierDefinitionArray.length == 1)
- assert(executionRules.requiredQualifierDefinitionArray.head.columnName.equals("B_FIELD"))
- assert(executionRules.requiredQualifierDefinitionArray.head.columnFamily.equals("c"))
- assert(executionRules.requiredQualifierDefinitionArray.head.qualifier.equals("b"))
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get3")))
+ assert(scanRange1.upperBound == null)
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
+
+
+ assert(results.length == 3)
}
/**
@@ -262,46 +375,145 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
*/
test("Test two complete range non merge rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " +
- "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
- "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
+ "( KEY_FIELD >= 1 and KEY_FIELD <= 2) or" +
+ "( KEY_FIELD > 3 and KEY_FIELD <= 5)").take(10)
+
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 4)
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
+ "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 2)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
- val keyFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
- assert(keyFieldFilter.ranges.length == 2)
- assert(keyFieldFilter.points.length == 0)
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(1)))
+ assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
+
+ val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+ assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3)))
+ assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5)))
+ assert(!scanRange2.isLowerBoundEqualTo)
+ assert(scanRange2.isUpperBoundEqualTo)
- assert(executionRules.requiredQualifierDefinitionArray.length == 2)
}
/**
* A complex query with two complex ranges that does merge into one
*/
test("Test two complete range merge rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
- "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
+ "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(results.length == 4)
+
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
+ "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 2)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
+ assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get2")))
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
+
+ val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
+ assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3")))
+ assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5")))
+ assert(!scanRange2.isLowerBoundEqualTo)
+ assert(scanRange2.isUpperBoundEqualTo)
+ }
+
+ test("Test OR logic with a one RowKey and One column") {
+
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
+ "WHERE " +
+ "( KEY_FIELD >= 'get1' or A_FIELD <= 'foo2') or" +
+ "( KEY_FIELD > 'get3' or B_FIELD <= '4')").take(10)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+ assert(results.length == 5)
+
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( KEY_FIELD >= 0 OR A_FIELD <= 1 ) OR " +
+ "( KEY_FIELD > 2 OR B_FIELD <= 3 ) )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ //This is the main test for 14406
+ //Because the key is joined through a or with a qualifier
+ //There is no filter on the rowKey
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+ assert(scanRange1.upperBound == null)
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
+ }
+
+ test("Test OR logic with a two columns") {
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
+ "WHERE " +
+ "( B_FIELD > '4' or A_FIELD <= 'foo2') or" +
+ "( A_FIELD > 'foo2' or B_FIELD < '4')").take(10)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
assert(results.length == 5)
- assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
- val keyFieldFilter =
- executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
- assert(keyFieldFilter.ranges.length == 1)
- assert(keyFieldFilter.points.length == 0)
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( ( B_FIELD > 0 OR A_FIELD <= 1 ) OR " +
+ "( A_FIELD > 2 OR B_FIELD < 3 ) )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+ assert(scanRange1.upperBound == null)
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
- assert(executionRules.requiredQualifierDefinitionArray.length == 2)
}
- test("test table that doesn't exist") {
+ test("Test single RowKey Or Column logic") {
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
+ "WHERE " +
+ "( KEY_FIELD >= 'get4' or A_FIELD <= 'foo2' )").take(10)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+ assert(results.length == 4)
+
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( KEY_FIELD >= 0 OR A_FIELD <= 1 )"))
+
+ assert(executionRules.rowKeyFilter.points.size == 0)
+ assert(executionRules.rowKeyFilter.ranges.size == 1)
+
+ val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
+ assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
+ assert(scanRange1.upperBound == null)
+ assert(scanRange1.isLowerBoundEqualTo)
+ assert(scanRange1.isUpperBoundEqualTo)
+ }
+
+
+ test("Test table that doesn't exist") {
intercept[TableNotFoundException] {
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map("hbase.columns.mapping" ->
@@ -315,6 +527,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
"( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
}
+ DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
}
test("Test table with column that doesn't exist") {
@@ -325,11 +538,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseFactColumnTmp")
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
+ val result = sqlContext.sql("SELECT KEY_FIELD, " +
+ "B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
assert(result.count() == 5)
- val localResult = result.take(5)
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
+
}
test("Test table with INT column") {
@@ -343,11 +559,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
" where I_FIELD > 4 and I_FIELD < 10")
- assert(result.count() == 2)
-
- val localResult = result.take(3)
+ val localResult = result.take(5)
+ assert(localResult.length == 2)
assert(localResult(0).getInt(2) == 8)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression.toExpressionString.
+ equals("( I_FIELD > 0 AND I_FIELD < 1 )"))
+
}
test("Test table with INT column defined at wrong type") {
@@ -358,11 +578,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp")
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+ val result = sqlContext.sql("SELECT KEY_FIELD, " +
+ "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
- assert(result.count() == 5)
+ val localResult = result.take(10)
+ assert(localResult.length == 5)
- val localResult = result.take(5)
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
assert(localResult(0).getString(2).length == 4)
assert(localResult(0).getString(2).charAt(0).toByte == 0)
@@ -380,9 +603,13 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseBadTmp")
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseBadTmp")
+ val result = sqlContext.sql("SELECT KEY_FIELD, " +
+ "B_FIELD, I_FIELD FROM hbaseBadTmp")
- val localResult = result.take(5)
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
+
+ result.take(5)
}
}
@@ -396,11 +623,15 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp")
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+ val result = sqlContext.sql("SELECT KEY_FIELD, " +
+ "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
- assert(result.count() == 5)
+ val localResult = result.take(10)
+ assert(localResult.length == 5)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
- val localResult = result.take(5)
}
}
@@ -413,11 +644,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp")
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+ val result = sqlContext.sql("SELECT KEY_FIELD, " +
+ "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
assert(result.count() == 5)
val localResult = result.take(5)
+ localResult.length
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
+
+
}
}
@@ -430,11 +668,16 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
df.registerTempTable("hbaseIntWrongTypeTmp")
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+ val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, " +
+ "I_FIELD FROM hbaseIntWrongTypeTmp")
+
+ val localResult = result.take(10)
+ assert(localResult.length == 5)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
- assert(result.count() == 5)
- val localResult = result.take(5)
}
}
@@ -448,9 +691,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
- assert(result.count() == 5)
-
- val localResult = result.take(5)
+ val localResult = result.take(10)
+ assert(localResult.length == 5)
assert(localResult(0).getString(2) == null)
assert(localResult(1).getString(2) == "FOO")
@@ -458,5 +700,27 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(localResult(3).getString(2) == "BAR")
assert(localResult(4).getString(2) == null)
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+ assert(executionRules.dynamicLogicExpression == null)
+ }
+
+ test("Test with column logic disabled") {
+ df = sqlContext.load("org.apache.hadoop.hbase.spark",
+ Map("hbase.columns.mapping" ->
+ "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,",
+ "hbase.table" -> "t1",
+ "hbase.push.down.column.filter" -> "false"))
+
+ df.registerTempTable("hbaseNoPushDownTmp")
+
+ val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNoPushDownTmp " +
+ "WHERE " +
+ "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
+
+ val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+ assert(results.length == 2)
+
+ assert(executionRules.dynamicLogicExpression == null)
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/dae1775a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
new file mode 100644
index 0000000..3140ebd
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Logging
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class DynamicLogicExpressionSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+
+ test("Basic And Test") {
+ val leftLogic = new LessThanLogicExpression("Col1", 0)
+ val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+ val andLogic = new AndLogicExpression(leftLogic, rightLogic)
+
+ val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
+
+ columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
+ val valueFromQueryValueArray = new Array[Array[Byte]](2)
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(10)
+ assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ val expressionString = andLogic.toExpressionString
+
+ assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
+
+ val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(10)
+ assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ }
+
+ test("Basic OR Test") {
+ val leftLogic = new LessThanLogicExpression("Col1", 0)
+ val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+ val OrLogic = new OrLogicExpression(leftLogic, rightLogic)
+
+ val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
+
+ columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
+ val valueFromQueryValueArray = new Array[Array[Byte]](2)
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(10)
+ assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ valueFromQueryValueArray(1) = Bytes.toBytes(10)
+ assert(!OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ val expressionString = OrLogic.toExpressionString
+
+ assert(expressionString.equals("( Col1 < 0 OR Col1 > 1 )"))
+
+ val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ valueFromQueryValueArray(1) = Bytes.toBytes(5)
+ assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(15)
+ valueFromQueryValueArray(1) = Bytes.toBytes(10)
+ assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ valueFromQueryValueArray(1) = Bytes.toBytes(10)
+ assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+ }
+
+ test("Basic Command Test") {
+ val greaterLogic = new GreaterThanLogicExpression("Col1", 0)
+ val greaterAndEqualLogic = new GreaterThanOrEqualLogicExpression("Col1", 0)
+ val lessLogic = new LessThanLogicExpression("Col1", 0)
+ val lessAndEqualLogic = new LessThanOrEqualLogicExpression("Col1", 0)
+ val equalLogic = new EqualLogicExpression("Col1", 0, false)
+ val notEqualLogic = new EqualLogicExpression("Col1", 0, true)
+ val passThrough = new PassThroughLogicExpression
+
+ val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
+ columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
+ val valueFromQueryValueArray = new Array[Array[Byte]](1)
+
+ //great than
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(20)
+ assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ //great than and equal
+ valueFromQueryValueArray(0) = Bytes.toBytes(5)
+ assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
+ valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
+ valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(20)
+ assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
+ valueFromQueryValueArray))
+
+ //less than
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(5)
+ assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ //less than and equal
+ valueFromQueryValueArray(0) = Bytes.toBytes(20)
+ assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(20)
+ assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ //equal too
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(5)
+ assert(!equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ //not equal too
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(!notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(5)
+ assert(notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ //pass through
+ valueFromQueryValueArray(0) = Bytes.toBytes(10)
+ assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ valueFromQueryValueArray(0) = Bytes.toBytes(5)
+ assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
+
+ }
+
+
+}