You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/11/07 16:28:14 UTC
[2/9] hbase git commit: HBASE-21443 [hbase-connectors] Purge hbase-*
modules from core now they've been moved to hbase-connectors
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ffeb54a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
deleted file mode 100644
index afe515b..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ /dev/null
@@ -1,1063 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
-import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import org.xml.sax.SAXParseException
-
-case class HBaseRecord(
- col0: String,
- col1: Boolean,
- col2: Double,
- col3: Float,
- col4: Int,
- col5: Long,
- col6: Short,
- col7: String,
- col8: Byte)
-
-object HBaseRecord {
- def apply(i: Int, t: String): HBaseRecord = {
- val s = s"""row${"%03d".format(i)}"""
- HBaseRecord(s,
- i % 2 == 0,
- i.toDouble,
- i.toFloat,
- i,
- i.toLong,
- i.toShort,
- s"String$i: $t",
- i.toByte)
- }
-}
-
-
-case class AvroHBaseKeyRecord(col0: Array[Byte],
- col1: Array[Byte])
-
-object AvroHBaseKeyRecord {
- val schemaString =
- s"""{"namespace": "example.avro",
- | "type": "record", "name": "User",
- | "fields": [ {"name": "name", "type": "string"},
- | {"name": "favorite_number", "type": ["int", "null"]},
- | {"name": "favorite_color", "type": ["string", "null"]} ] }""".stripMargin
-
- val avroSchema: Schema = {
- val p = new Schema.Parser
- p.parse(schemaString)
- }
-
- def apply(i: Int): AvroHBaseKeyRecord = {
- val user = new GenericData.Record(avroSchema);
- user.put("name", s"name${"%03d".format(i)}")
- user.put("favorite_number", i)
- user.put("favorite_color", s"color${"%03d".format(i)}")
- val avroByte = AvroSerdes.serialize(user, avroSchema)
- AvroHBaseKeyRecord(avroByte, avroByte)
- }
-}
-
-class DefaultSourceSuite extends FunSuite with
-BeforeAndAfterEach with BeforeAndAfterAll with Logging {
- @transient var sc: SparkContext = null
- var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
-
- val t1TableName = "t1"
- val t2TableName = "t2"
- val columnFamily = "c"
-
- var sqlContext:SQLContext = null
- var df:DataFrame = null
-
- override def beforeAll() {
-
- TEST_UTIL.startMiniCluster
-
- logInfo(" - minicluster started")
- try
- TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
- catch {
- case e: Exception => logInfo(" - no table " + t1TableName + " found")
- }
- try
- TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
- catch {
- case e: Exception => logInfo(" - no table " + t2TableName + " found")
- }
- logInfo(" - creating table " + t1TableName)
- TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
- logInfo(" - created table")
- logInfo(" - creating table " + t2TableName)
- TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
- logInfo(" - created table")
- val sparkConf = new SparkConf
- sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
- sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
- sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
-
- sc = new SparkContext("local", "test", sparkConf)
-
- val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
- try {
- val t1Table = connection.getTable(TableName.valueOf("t1"))
-
- try {
- var put = new Put(Bytes.toBytes("get1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
- t1Table.put(put)
- put = new Put(Bytes.toBytes("get2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
- t1Table.put(put)
- put = new Put(Bytes.toBytes("get3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
- t1Table.put(put)
- put = new Put(Bytes.toBytes("get4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
- t1Table.put(put)
- put = new Put(Bytes.toBytes("get5"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
- t1Table.put(put)
- } finally {
- t1Table.close()
- }
-
- val t2Table = connection.getTable(TableName.valueOf("t2"))
-
- try {
- var put = new Put(Bytes.toBytes(1))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
- t2Table.put(put)
- put = new Put(Bytes.toBytes(2))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
- t2Table.put(put)
- put = new Put(Bytes.toBytes(3))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
- t2Table.put(put)
- put = new Put(Bytes.toBytes(4))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
- t2Table.put(put)
- put = new Put(Bytes.toBytes(5))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
- t2Table.put(put)
- } finally {
- t2Table.close()
- }
- } finally {
- connection.close()
- }
-
- def hbaseTable1Catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
- |}
- |}""".stripMargin
-
- new HBaseContext(sc, TEST_UTIL.getConfiguration)
- sqlContext = new SQLContext(sc)
-
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->hbaseTable1Catalog))
-
- df.registerTempTable("hbaseTable1")
-
- def hbaseTable2Catalog = s"""{
- |"table":{"namespace":"default", "name":"t2"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"int"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
- |}
- |}""".stripMargin
-
-
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->hbaseTable2Catalog))
-
- df.registerTempTable("hbaseTable2")
- }
-
- override def afterAll() {
- TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
- logInfo("shuting down minicluster")
- TEST_UTIL.shutdownMiniCluster()
-
- sc.stop()
- }
-
- override def beforeEach(): Unit = {
- DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
- }
-
-
- /**
- * A example of query three fields and also only using rowkey points for the filter
- */
- test("Test rowKey point only rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 3)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( KEY_FIELD == 0 OR KEY_FIELD == 1 ) OR KEY_FIELD == 2 )"))
-
- assert(executionRules.rowKeyFilter.points.size == 3)
- assert(executionRules.rowKeyFilter.ranges.size == 0)
- }
-
- /**
- * A example of query three fields and also only using cell points for the filter
- */
- test("Test cell point only rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 3)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( B_FIELD == 0 OR B_FIELD == 1 ) OR A_FIELD == 2 )"))
- }
-
- /**
- * A example of a OR merge between to ranges the result is one range
- * Also an example of less then and greater then
- */
- test("Test two range rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 3)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 2)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
- assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2")))
- assert(scanRange1.isLowerBoundEqualTo)
- assert(!scanRange1.isUpperBoundEqualTo)
-
- val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
- assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3")))
- assert(scanRange2.upperBound == null)
- assert(!scanRange2.isLowerBoundEqualTo)
- assert(scanRange2.isUpperBoundEqualTo)
- }
-
- /**
- * A example of a OR merge between to ranges the result is one range
- * Also an example of less then and greater then
- *
- * This example makes sure the code works for a int rowKey
- */
- test("Test two range rowKey query where the rowKey is Int and there is a range over lap") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
- "WHERE " +
- "( KEY_FIELD < 4 or KEY_FIELD > 2)").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 2)
- assert(results.length == 5)
- }
-
- /**
- * A example of a OR merge between to ranges the result is two ranges
- * Also an example of less then and greater then
- *
- * This example makes sure the code works for a int rowKey
- */
- test("Test two range rowKey query where the rowKey is Int and the ranges don't over lap") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
- "WHERE " +
- "( KEY_FIELD < 2 or KEY_FIELD > 4)").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
-
- assert(executionRules.rowKeyFilter.ranges.size == 3)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
- assert(scanRange1.isLowerBoundEqualTo)
- assert(!scanRange1.isUpperBoundEqualTo)
-
- val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
- assert(scanRange2.isUpperBoundEqualTo)
-
- assert(results.length == 2)
- }
-
- /**
- * A example of a AND merge between to ranges the result is one range
- * Also an example of less then and equal to and greater then and equal to
- */
- test("Test one combined range rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 2)
-
- val expr = executionRules.dynamicLogicExpression.toExpressionString
- assert(expr.equals("( ( KEY_FIELD isNotNull AND KEY_FIELD <= 0 ) AND KEY_FIELD >= 1 )"), expr)
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 1)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get2")))
- assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
-
- }
-
- /**
- * Do a select with no filters
- */
- test("Test select only query") {
-
- val results = df.select("KEY_FIELD").take(10)
- assert(results.length == 5)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(executionRules.dynamicLogicExpression == null)
-
- }
-
- /**
- * A complex query with one point and one range for both the
- * rowKey and the a column
- */
- test("Test SQL point and range combo") {
- val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
- "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( KEY_FIELD == 0 AND B_FIELD < 1 ) OR " +
- "( KEY_FIELD >= 2 AND B_FIELD == 3 ) )"))
-
- assert(executionRules.rowKeyFilter.points.size == 1)
- assert(executionRules.rowKeyFilter.ranges.size == 1)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get3")))
- assert(scanRange1.upperBound == null)
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
-
-
- assert(results.length == 3)
- }
-
- /**
- * A complex query with two complex ranges that doesn't merge into one
- */
- test("Test two complete range non merge rowKey query") {
-
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
- "WHERE " +
- "( KEY_FIELD >= 1 and KEY_FIELD <= 2) or" +
- "( KEY_FIELD > 3 and KEY_FIELD <= 5)").take(10)
-
-
- assert(results.length == 4)
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
- "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 2)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(1)))
- assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
-
- val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
- assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3)))
- assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5)))
- assert(!scanRange2.isLowerBoundEqualTo)
- assert(scanRange2.isUpperBoundEqualTo)
-
- }
-
- /**
- * A complex query with two complex ranges that does merge into one
- */
- test("Test two complete range merge rowKey query") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
- "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 4)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
- "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 2)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
- assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get2")))
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
-
- val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
- assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3")))
- assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5")))
- assert(!scanRange2.isLowerBoundEqualTo)
- assert(scanRange2.isUpperBoundEqualTo)
- }
-
- test("Test OR logic with a one RowKey and One column") {
-
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "( KEY_FIELD >= 'get1' or A_FIELD <= 'foo2') or" +
- "( KEY_FIELD > 'get3' or B_FIELD <= '4')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 5)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( KEY_FIELD >= 0 OR A_FIELD <= 1 ) OR " +
- "( KEY_FIELD > 2 OR B_FIELD <= 3 ) )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 1)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- //This is the main test for 14406
- //Because the key is joined through a or with a qualifier
- //There is no filter on the rowKey
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
- assert(scanRange1.upperBound == null)
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
- }
-
- test("Test OR logic with a two columns") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "( B_FIELD > '4' or A_FIELD <= 'foo2') or" +
- "( A_FIELD > 'foo2' or B_FIELD < '4')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 5)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( ( B_FIELD > 0 OR A_FIELD <= 1 ) OR " +
- "( A_FIELD > 2 OR B_FIELD < 3 ) )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 1)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
- assert(scanRange1.upperBound == null)
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
-
- }
-
- test("Test single RowKey Or Column logic") {
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
- "WHERE " +
- "( KEY_FIELD >= 'get4' or A_FIELD <= 'foo2' )").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 4)
-
- assert(executionRules.dynamicLogicExpression.toExpressionString.
- equals("( KEY_FIELD >= 0 OR A_FIELD <= 1 )"))
-
- assert(executionRules.rowKeyFilter.points.size == 0)
- assert(executionRules.rowKeyFilter.ranges.size == 1)
-
- val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
- assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
- assert(scanRange1.upperBound == null)
- assert(scanRange1.isLowerBoundEqualTo)
- assert(scanRange1.isUpperBoundEqualTo)
- }
-
- test("Test table that doesn't exist") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1NotThere"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"c", "type":"string"}
- |}
- |}""".stripMargin
-
- intercept[Exception] {
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog))
-
- df.registerTempTable("hbaseNonExistingTmp")
-
- sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNonExistingTmp " +
- "WHERE " +
- "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
- "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
- }
- DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- }
-
-
- test("Test table with column that doesn't exist") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
- |"C_FIELD":{"cf":"c", "col":"c", "type":"string"}
- |}
- |}""".stripMargin
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog))
-
- df.registerTempTable("hbaseFactColumnTmp")
-
- val result = sqlContext.sql("SELECT KEY_FIELD, " +
- "B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
-
- assert(result.count() == 5)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(executionRules.dynamicLogicExpression == null)
-
- }
-
- test("Test table with INT column") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
- |"I_FIELD":{"cf":"c", "col":"i", "type":"int"}
- |}
- |}""".stripMargin
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog))
-
- df.registerTempTable("hbaseIntTmp")
-
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
- " where I_FIELD > 4 and I_FIELD < 10")
-
- val localResult = result.take(5)
-
- assert(localResult.length == 2)
- assert(localResult(0).getInt(2) == 8)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- val expr = executionRules.dynamicLogicExpression.toExpressionString
- logInfo(expr)
- assert(expr.equals("( ( I_FIELD isNotNull AND I_FIELD > 0 ) AND I_FIELD < 1 )"), expr)
-
- }
-
- test("Test table with INT column defined at wrong type") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
- |"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
- |}
- |}""".stripMargin
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog))
-
- df.registerTempTable("hbaseIntWrongTypeTmp")
-
- val result = sqlContext.sql("SELECT KEY_FIELD, " +
- "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
-
- val localResult = result.take(10)
- assert(localResult.length == 5)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(executionRules.dynamicLogicExpression == null)
-
- assert(localResult(0).getString(2).length == 4)
- assert(localResult(0).getString(2).charAt(0).toByte == 0)
- assert(localResult(0).getString(2).charAt(1).toByte == 0)
- assert(localResult(0).getString(2).charAt(2).toByte == 0)
- assert(localResult(0).getString(2).charAt(3).toByte == 1)
- }
-
- test("Test bad column type") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"FOOBAR"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
- |}
- |}""".stripMargin
- intercept[Exception] {
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog))
-
- df.registerTempTable("hbaseIntWrongTypeTmp")
-
- val result = sqlContext.sql("SELECT KEY_FIELD, " +
- "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
-
- val localResult = result.take(10)
- assert(localResult.length == 5)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(executionRules.dynamicLogicExpression == null)
-
- }
- }
-
- test("Test HBaseSparkConf matching") {
- val df = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
- Map("cacheSize" -> "100",
- "batchNum" -> "100",
- "blockCacheingEnable" -> "true", "rowNum" -> "10"))
- assert(df.count() == 10)
-
- val df1 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
- Map("cacheSize" -> "1000",
- "batchNum" -> "100", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
- intercept[Exception] {
- assert(df1.count() == 10)
- }
-
- val df2 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
- Map("cacheSize" -> "100",
- "batchNum" -> "1000", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
- intercept[Exception] {
- assert(df2.count() == 10)
- }
-
- val df3 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
- Map("cacheSize" -> "100",
- "batchNum" -> "100", "blockCacheingEnable" -> "false", "rowNum" -> "10"))
- intercept[Exception] {
- assert(df3.count() == 10)
- }
- }
-
- test("Test table with sparse column") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
- |"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
- |}
- |}""".stripMargin
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog))
-
- df.registerTempTable("hbaseZTmp")
-
- val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
-
- val localResult = result.take(10)
- assert(localResult.length == 5)
-
- assert(localResult(0).getString(2) == null)
- assert(localResult(1).getString(2) == "FOO")
- assert(localResult(2).getString(2) == null)
- assert(localResult(3).getString(2) == "BAR")
- assert(localResult(4).getString(2) == null)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
- assert(executionRules.dynamicLogicExpression == null)
- }
-
- test("Test with column logic disabled") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
- |"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
- |}
- |}""".stripMargin
- df = sqlContext.load("org.apache.hadoop.hbase.spark",
- Map(HBaseTableCatalog.tableCatalog->catalog,
- HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
-
- df.registerTempTable("hbaseNoPushDownTmp")
-
- val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNoPushDownTmp " +
- "WHERE " +
- "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
-
- val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
- assert(results.length == 2)
-
- assert(executionRules.dynamicLogicExpression == null)
- }
-
- def writeCatalog = s"""{
- |"table":{"namespace":"default", "name":"table1"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
- |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
- |"col2":{"cf":"cf1", "col":"col2", "type":"double"},
- |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
- |"col4":{"cf":"cf3", "col":"col4", "type":"int"},
- |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
- |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
- |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
- |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
- |}
- |}""".stripMargin
-
- def withCatalog(cat: String): DataFrame = {
- sqlContext
- .read
- .options(Map(HBaseTableCatalog.tableCatalog->cat))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- }
-
- test("populate table") {
- val sql = sqlContext
- import sql.implicits._
- val data = (0 to 255).map { i =>
- HBaseRecord(i, "extra")
- }
- sc.parallelize(data).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
- }
-
- test("empty column") {
- val df = withCatalog(writeCatalog)
- df.registerTempTable("table0")
- val c = sqlContext.sql("select count(1) from table0").rdd.collect()(0)(0).asInstanceOf[Long]
- assert(c == 256)
- }
-
- test("full query") {
- val df = withCatalog(writeCatalog)
- df.show()
- assert(df.count() == 256)
- }
-
- test("filtered query0") {
- val sql = sqlContext
- import sql.implicits._
- val df = withCatalog(writeCatalog)
- val s = df.filter($"col0" <= "row005")
- .select("col0", "col1")
- s.show()
- assert(s.count() == 6)
- }
-
- test("Timestamp semantics") {
- val sql = sqlContext
- import sql.implicits._
-
- // There's already some data in here from recently. Let's throw something in
- // from 1993 which we can include/exclude and add some data with the implicit (now) timestamp.
- // Then we should be able to cross-section it and only get points in between, get the most recent view
- // and get an old view.
- val oldMs = 754869600000L
- val startMs = System.currentTimeMillis()
- val oldData = (0 to 100).map { i =>
- HBaseRecord(i, "old")
- }
- val newData = (200 to 255).map { i =>
- HBaseRecord(i, "new")
- }
-
- sc.parallelize(oldData).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5",
- HBaseSparkConf.TIMESTAMP -> oldMs.toString))
- .format("org.apache.hadoop.hbase.spark")
- .save()
- sc.parallelize(newData).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
-
- // Test specific timestamp -- Full scan, Timestamp
- val individualTimestamp = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- assert(individualTimestamp.count() == 101)
-
- // Test getting everything -- Full Scan, No range
- val everything = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- assert(everything.count() == 256)
- // Test getting everything -- Pruned Scan, TimeRange
- val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
- assert(element50 == "String50: extra")
- val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
- assert(element200 == "String200: new")
-
- // Test Getting old stuff -- Full Scan, TimeRange
- val oldRange = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
- HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- assert(oldRange.count() == 101)
- // Test Getting old stuff -- Pruned Scan, TimeRange
- val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
- assert(oldElement50 == "String50: old")
-
- // Test Getting middle stuff -- Full Scan, TimeRange
- val middleRange = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
- HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- assert(middleRange.count() == 256)
- // Test Getting middle stuff -- Pruned Scan, TimeRange
- val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
- assert(middleElement200 == "String200: extra")
- }
-
-
- // catalog for insertion
- def avroWriteCatalog = s"""{
- |"table":{"namespace":"default", "name":"avrotable"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"binary"},
- |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
- |}
- |}""".stripMargin
-
- // catalog for read
- def avroCatalog = s"""{
- |"table":{"namespace":"default", "name":"avrotable"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "avro":"avroSchema"},
- |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
- |}
- |}""".stripMargin
-
- // for insert to another table
- def avroCatalogInsert = s"""{
- |"table":{"namespace":"default", "name":"avrotableInsert"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "avro":"avroSchema"},
- |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
- |}
- |}""".stripMargin
-
- def withAvroCatalog(cat: String): DataFrame = {
- sqlContext
- .read
- .options(Map("avroSchema"->AvroHBaseKeyRecord.schemaString,
- HBaseTableCatalog.tableCatalog->avroCatalog))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- }
-
-
- test("populate avro table") {
- val sql = sqlContext
- import sql.implicits._
-
- val data = (0 to 255).map { i =>
- AvroHBaseKeyRecord(i)
- }
- sc.parallelize(data).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> avroWriteCatalog,
- HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
- }
-
- test("avro empty column") {
- val df = withAvroCatalog(avroCatalog)
- df.registerTempTable("avrotable")
- val c = sqlContext.sql("select count(1) from avrotable")
- .rdd.collect()(0)(0).asInstanceOf[Long]
- assert(c == 256)
- }
-
- test("avro full query") {
- val df = withAvroCatalog(avroCatalog)
- df.show()
- df.printSchema()
- assert(df.count() == 256)
- }
-
- test("avro serialization and deserialization query") {
- val df = withAvroCatalog(avroCatalog)
- df.write.options(
- Map("avroSchema"->AvroHBaseKeyRecord.schemaString,
- HBaseTableCatalog.tableCatalog->avroCatalogInsert,
- HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
- val newDF = withAvroCatalog(avroCatalogInsert)
- newDF.show()
- newDF.printSchema()
- assert(newDF.count() == 256)
- }
-
- test("avro filtered query") {
- val sql = sqlContext
- import sql.implicits._
- val df = withAvroCatalog(avroCatalog)
- val r = df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
- .select("col0", "col1.favorite_color", "col1.favorite_number")
- r.show()
- assert(r.count() == 6)
- }
-
- test("avro Or filter") {
- val sql = sqlContext
- import sql.implicits._
- val df = withAvroCatalog(avroCatalog)
- val s = df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
- .select("col0", "col1.favorite_color", "col1.favorite_number")
- s.show()
- assert(s.count() == 7)
- }
-
- test("test create HBaseRelation with new context throws SAXParseException") {
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"t1NotThere"},
- |"rowkey":"key",
- |"columns":{
- |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
- |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
- |"B_FIELD":{"cf":"c", "col":"c", "type":"string"}
- |}
- |}""".stripMargin
- try {
- HBaseRelation(Map(HBaseTableCatalog.tableCatalog -> catalog,
- HBaseSparkConf.USE_HBASECONTEXT -> "false"), None)(sqlContext)
- } catch {
- case e: Throwable => if(e.getCause.isInstanceOf[SAXParseException]) {
- fail("SAXParseException due to configuration loading empty resource")
- } else {
- println("Failed due to some other exception, ignore " + e.getMessage)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ffeb54a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
deleted file mode 100644
index 0424527..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, JavaBytesEncoder}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.types._
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-class DynamicLogicExpressionSuite extends FunSuite with
-BeforeAndAfterEach with BeforeAndAfterAll with Logging {
-
- val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
-
- test("Basic And Test") {
- val leftLogic = new LessThanLogicExpression("Col1", 0)
- leftLogic.setEncoder(encoder)
- val rightLogic = new GreaterThanLogicExpression("Col1", 1)
- rightLogic.setEncoder(encoder)
- val andLogic = new AndLogicExpression(leftLogic, rightLogic)
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
-
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
- val valueFromQueryValueArray = new Array[Array[Byte]](2)
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- val expressionString = andLogic.toExpressionString
-
- assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
-
- val builtExpression = DynamicLogicExpressionBuilder.build(expressionString, encoder)
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- }
-
- test("Basic OR Test") {
- val leftLogic = new LessThanLogicExpression("Col1", 0)
- leftLogic.setEncoder(encoder)
- val rightLogic = new GreaterThanLogicExpression("Col1", 1)
- rightLogic.setEncoder(encoder)
- val OrLogic = new OrLogicExpression(leftLogic, rightLogic)
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
-
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
- val valueFromQueryValueArray = new Array[Array[Byte]](2)
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
- assert(OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
- assert(!OrLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- val expressionString = OrLogic.toExpressionString
-
- assert(expressionString.equals("( Col1 < 0 OR Col1 > 1 )"))
-
- val builtExpression = DynamicLogicExpressionBuilder.build(expressionString, encoder)
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- }
-
- test("Basic Command Test") {
- val greaterLogic = new GreaterThanLogicExpression("Col1", 0)
- greaterLogic.setEncoder(encoder)
- val greaterAndEqualLogic = new GreaterThanOrEqualLogicExpression("Col1", 0)
- greaterAndEqualLogic.setEncoder(encoder)
- val lessLogic = new LessThanLogicExpression("Col1", 0)
- lessLogic.setEncoder(encoder)
- val lessAndEqualLogic = new LessThanOrEqualLogicExpression("Col1", 0)
- lessAndEqualLogic.setEncoder(encoder)
- val equalLogic = new EqualLogicExpression("Col1", 0, false)
- val notEqualLogic = new EqualLogicExpression("Col1", 0, true)
- val passThrough = new PassThroughLogicExpression
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(10)))
- val valueFromQueryValueArray = new Array[Array[Byte]](1)
-
- //great than
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
- assert(!greaterLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- //great than and equal
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 5)
- assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
- valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
- valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
- assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
- valueFromQueryValueArray))
-
- //less than
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 5)
- assert(!lessLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- //less than and equal
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
- assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
- assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
- assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- //equal too
- valueFromQueryValueArray(0) = Bytes.toBytes(10)
- assert(equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = Bytes.toBytes(5)
- assert(!equalLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- //not equal too
- valueFromQueryValueArray(0) = Bytes.toBytes(10)
- assert(!notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = Bytes.toBytes(5)
- assert(notEqualLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- //pass through
- valueFromQueryValueArray(0) = Bytes.toBytes(10)
- assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = Bytes.toBytes(5)
- assert(passThrough.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- }
-
-
- test("Double Type") {
- val leftLogic = new LessThanLogicExpression("Col1", 0)
- leftLogic.setEncoder(encoder)
- val rightLogic = new GreaterThanLogicExpression("Col1", 1)
- rightLogic.setEncoder(encoder)
- val andLogic = new AndLogicExpression(leftLogic, rightLogic)
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
-
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(-4.0d)))
- val valueFromQueryValueArray = new Array[Array[Byte]](2)
- valueFromQueryValueArray(0) = encoder.encode(DoubleType, 15.0d)
- valueFromQueryValueArray(1) = encoder.encode(DoubleType, -5.0d)
- assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(DoubleType, 10.0d)
- valueFromQueryValueArray(1) = encoder.encode(DoubleType, -1.0d)
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(DoubleType, -10.0d)
- valueFromQueryValueArray(1) = encoder.encode(DoubleType, -20.0d)
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- val expressionString = andLogic.toExpressionString
- // Note that here 0 and 1 is index, instead of value.
- assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
-
- val builtExpression = DynamicLogicExpressionBuilder.build(expressionString, encoder)
- valueFromQueryValueArray(0) = encoder.encode(DoubleType, 15.0d)
- valueFromQueryValueArray(1) = encoder.encode(DoubleType, -5.0d)
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(DoubleType, 10.0d)
- valueFromQueryValueArray(1) = encoder.encode(DoubleType, -1.0d)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(DoubleType, -10.0d)
- valueFromQueryValueArray(1) = encoder.encode(DoubleType, -20.0d)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- }
-
- test("Float Type") {
- val leftLogic = new LessThanLogicExpression("Col1", 0)
- leftLogic.setEncoder(encoder)
- val rightLogic = new GreaterThanLogicExpression("Col1", 1)
- rightLogic.setEncoder(encoder)
- val andLogic = new AndLogicExpression(leftLogic, rightLogic)
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
-
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(-4.0f)))
- val valueFromQueryValueArray = new Array[Array[Byte]](2)
- valueFromQueryValueArray(0) = encoder.encode(FloatType, 15.0f)
- valueFromQueryValueArray(1) = encoder.encode(FloatType, -5.0f)
- assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(FloatType, 10.0f)
- valueFromQueryValueArray(1) = encoder.encode(FloatType, -1.0f)
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(FloatType, -10.0f)
- valueFromQueryValueArray(1) = encoder.encode(FloatType, -20.0f)
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- val expressionString = andLogic.toExpressionString
- // Note that here 0 and 1 is index, instead of value.
- assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
-
- val builtExpression = DynamicLogicExpressionBuilder.build(expressionString, encoder)
- valueFromQueryValueArray(0) = encoder.encode(FloatType, 15.0f)
- valueFromQueryValueArray(1) = encoder.encode(FloatType, -5.0f)
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(FloatType, 10.0f)
- valueFromQueryValueArray(1) = encoder.encode(FloatType, -1.0f)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(FloatType, -10.0f)
- valueFromQueryValueArray(1) = encoder.encode(FloatType, -20.0f)
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- }
-
- test("String Type") {
- val leftLogic = new LessThanLogicExpression("Col1", 0)
- leftLogic.setEncoder(encoder)
- val rightLogic = new GreaterThanLogicExpression("Col1", 1)
- rightLogic.setEncoder(encoder)
- val andLogic = new AndLogicExpression(leftLogic, rightLogic)
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
-
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes("row005")))
- val valueFromQueryValueArray = new Array[Array[Byte]](2)
- valueFromQueryValueArray(0) = encoder.encode(StringType, "row015")
- valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
- assert(andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(StringType, "row004")
- valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(StringType, "row020")
- valueFromQueryValueArray(1) = encoder.encode(StringType, "row010")
- assert(!andLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- val expressionString = andLogic.toExpressionString
- // Note that here 0 and 1 is index, instead of value.
- assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
-
- val builtExpression = DynamicLogicExpressionBuilder.build(expressionString, encoder)
- valueFromQueryValueArray(0) = encoder.encode(StringType, "row015")
- valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
- assert(builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(StringType, "row004")
- valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
-
- valueFromQueryValueArray(0) = encoder.encode(StringType, "row020")
- valueFromQueryValueArray(1) = encoder.encode(StringType, "row010")
- assert(!builtExpression.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- }
-
- test("Boolean Type") {
- val leftLogic = new LessThanLogicExpression("Col1", 0)
- leftLogic.setEncoder(encoder)
- val rightLogic = new GreaterThanLogicExpression("Col1", 1)
- rightLogic.setEncoder(encoder)
-
- val columnToCurrentRowValueMap = new util.HashMap[String, ByteArrayComparable]()
-
- columnToCurrentRowValueMap.put("Col1", new ByteArrayComparable(Bytes.toBytes(false)))
- val valueFromQueryValueArray = new Array[Array[Byte]](2)
- valueFromQueryValueArray(0) = encoder.encode(BooleanType, true)
- valueFromQueryValueArray(1) = encoder.encode(BooleanType, false)
- assert(leftLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- assert(!rightLogic.execute(columnToCurrentRowValueMap, valueFromQueryValueArray))
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ffeb54a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
deleted file mode 100644
index 74bf912..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.hbase.spark.datasources.{DataTypeParserWrapper, DoubleSerDes, HBaseTableCatalog}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.types._
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with Logging {
-
- val map = s"""MAP<int, struct<varchar:string>>"""
- val array = s"""array<struct<tinYint:tinyint>>"""
- val arrayMap = s"""MAp<int, ARRAY<double>>"""
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"htable"},
- |"rowkey":"key1:key2",
- |"columns":{
- |"col1":{"cf":"rowkey", "col":"key1", "type":"string"},
- |"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
- |"col3":{"cf":"cf1", "col":"col2", "type":"binary"},
- |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"},
- |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"},
- |"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
- |"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
- |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
- |}
- |}""".stripMargin
- val parameters = Map(HBaseTableCatalog.tableCatalog->catalog)
- val t = HBaseTableCatalog(parameters)
-
- def checkDataType(dataTypeString: String, expectedDataType: DataType): Unit = {
- test(s"parse ${dataTypeString.replace("\n", "")}") {
- assert(DataTypeParserWrapper.parse(dataTypeString) === expectedDataType)
- }
- }
- test("basic") {
- assert(t.getField("col1").isRowKey == true)
- assert(t.getPrimaryKey == "key1")
- assert(t.getField("col3").dt == BinaryType)
- assert(t.getField("col4").dt == TimestampType)
- assert(t.getField("col5").dt == DoubleType)
- assert(t.getField("col5").serdes != None)
- assert(t.getField("col4").serdes == None)
- assert(t.getField("col1").isRowKey)
- assert(t.getField("col2").isRowKey)
- assert(!t.getField("col3").isRowKey)
- assert(t.getField("col2").length == Bytes.SIZEOF_DOUBLE)
- assert(t.getField("col1").length == -1)
- assert(t.getField("col8").length == -1)
- }
-
- checkDataType(
- map,
- t.getField("col6").dt
- )
-
- checkDataType(
- array,
- t.getField("col7").dt
- )
-
- checkDataType(
- arrayMap,
- t.getField("col8").dt
- )
-
- test("convert") {
- val m = Map("hbase.columns.mapping" ->
- "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD BINARY c:c,",
- "hbase.table" -> "t1")
- val map = HBaseTableCatalog.convert(m)
- val json = map.get(HBaseTableCatalog.tableCatalog).get
- val parameters = Map(HBaseTableCatalog.tableCatalog->json)
- val t = HBaseTableCatalog(parameters)
- assert(t.getField("KEY_FIELD").isRowKey)
- assert(DataTypeParserWrapper.parse("STRING") === t.getField("A_FIELD").dt)
- assert(!t.getField("A_FIELD").isRowKey)
- assert(DataTypeParserWrapper.parse("DOUBLE") === t.getField("B_FIELD").dt)
- assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
- }
-
- test("compatiblity") {
- val m = Map("hbase.columns.mapping" ->
- "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD BINARY c:c,",
- "hbase.table" -> "t1")
- val t = HBaseTableCatalog(m)
- assert(t.getField("KEY_FIELD").isRowKey)
- assert(DataTypeParserWrapper.parse("STRING") === t.getField("A_FIELD").dt)
- assert(!t.getField("A_FIELD").isRowKey)
- assert(DataTypeParserWrapper.parse("DOUBLE") === t.getField("B_FIELD").dt)
- assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ffeb54a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
deleted file mode 100644
index 5b42bd9..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util.concurrent.ExecutorService
-import scala.util.Random
-
-import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
- Connection, BufferedMutatorParams, Admin, TableBuilder}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.TableName
-import org.scalatest.FunSuite
-
-case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) {
- override def hashCode: Int = {
- confId
- }
-
- override def equals(obj: Any): Boolean = {
- if(!obj.isInstanceOf[HBaseConnectionKeyMocker])
- false
- else
- confId == obj.asInstanceOf[HBaseConnectionKeyMocker].confId
- }
-}
-
-class ConnectionMocker extends Connection {
- var isClosed: Boolean = false
-
- def getRegionLocator (tableName: TableName): RegionLocator = null
- def getConfiguration: Configuration = null
- def getTable (tableName: TableName): Table = null
- def getTable(tableName: TableName, pool: ExecutorService): Table = null
- def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
- def getBufferedMutator (tableName: TableName): BufferedMutator = null
- def getAdmin: Admin = null
- def getTableBuilder(tableName: TableName, pool: ExecutorService): TableBuilder = null
-
- def close(): Unit = {
- if (isClosed)
- throw new IllegalStateException()
- isClosed = true
- }
-
- def isAborted: Boolean = true
- def abort(why: String, e: Throwable) = {}
-}
-
-class HBaseConnectionCacheSuite extends FunSuite with Logging {
- /*
- * These tests must be performed sequentially as they operate with an
- * unique running thread and resource.
- *
- * It looks there's no way to tell FunSuite to do so, so making those
- * test cases normal functions which are called sequentially in a single
- * test case.
- */
- test("all test cases") {
- testBasic()
- testWithPressureWithoutClose()
- testWithPressureWithClose()
- }
-
- def cleanEnv() {
- HBaseConnectionCache.connectionMap.synchronized {
- HBaseConnectionCache.connectionMap.clear()
- HBaseConnectionCache.cacheStat.numActiveConnections = 0
- HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0
- HBaseConnectionCache.cacheStat.numTotalRequests = 0
- }
- }
-
- def testBasic() {
- cleanEnv()
- HBaseConnectionCache.setTimeout(1 * 1000)
-
- val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
- val connKeyMocker1a = new HBaseConnectionKeyMocker(1)
- val connKeyMocker2 = new HBaseConnectionKeyMocker(2)
-
- val c1 = HBaseConnectionCache
- .getConnection(connKeyMocker1, new ConnectionMocker)
-
- assert(HBaseConnectionCache.connectionMap.size === 1)
- assert(HBaseConnectionCache.getStat.numTotalRequests === 1)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
-
- val c1a = HBaseConnectionCache
- .getConnection(connKeyMocker1a, new ConnectionMocker)
-
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 1)
- assert(HBaseConnectionCache.getStat.numTotalRequests === 2)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
- }
-
- val c2 = HBaseConnectionCache
- .getConnection(connKeyMocker2, new ConnectionMocker)
-
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 2)
- assert(HBaseConnectionCache.getStat.numTotalRequests === 3)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
- }
-
- c1.close()
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 2)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
- }
-
- c1a.close()
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 2)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
- }
-
- Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 1)
- assert(HBaseConnectionCache.connectionMap.iterator.next()._1
- .asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
- }
-
- c2.close()
- }
-
- def testWithPressureWithoutClose() {
- cleanEnv()
-
- class TestThread extends Runnable {
- override def run() {
- for (i <- 0 to 999) {
- val c = HBaseConnectionCache.getConnection(
- new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
- }
- }
- }
-
- HBaseConnectionCache.setTimeout(500)
- val threads: Array[Thread] = new Array[Thread](100)
- for (i <- 0 to 99) {
- threads.update(i, new Thread(new TestThread()))
- threads(i).run()
- }
- try {
- threads.foreach { x => x.join() }
- } catch {
- case e: InterruptedException => println(e.getMessage)
- }
-
- Thread.sleep(1000)
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 10)
- assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
-
- var totalRc : Int = 0
- HBaseConnectionCache.connectionMap.foreach {
- x => totalRc += x._2.refCount
- }
- assert(totalRc === 100 * 1000)
- HBaseConnectionCache.connectionMap.foreach {
- x => {
- x._2.refCount = 0
- x._2.timestamp = System.currentTimeMillis() - 1000
- }
- }
- }
- Thread.sleep(1000)
- assert(HBaseConnectionCache.connectionMap.size === 0)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
- }
-
- def testWithPressureWithClose() {
- cleanEnv()
-
- class TestThread extends Runnable {
- override def run() {
- for (i <- 0 to 999) {
- val c = HBaseConnectionCache.getConnection(
- new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
- Thread.`yield`()
- c.close()
- }
- }
- }
-
- HBaseConnectionCache.setTimeout(3 * 1000)
- val threads: Array[Thread] = new Array[Thread](100)
- for (i <- threads.indices) {
- threads.update(i, new Thread(new TestThread()))
- threads(i).run()
- }
- try {
- threads.foreach { x => x.join() }
- } catch {
- case e: InterruptedException => println(e.getMessage)
- }
-
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 10)
- assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
- }
-
- Thread.sleep(6 * 1000)
- HBaseConnectionCache.connectionMap.synchronized {
- assert(HBaseConnectionCache.connectionMap.size === 0)
- assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
- assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ffeb54a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
deleted file mode 100644
index 83e2ac6..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility}
-import org.apache.spark.{SparkException, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-class HBaseContextSuite extends FunSuite with
-BeforeAndAfterEach with BeforeAndAfterAll with Logging {
-
- @transient var sc: SparkContext = null
- var TEST_UTIL = new HBaseTestingUtility
-
- val tableName = "t1"
- val columnFamily = "c"
-
- override def beforeAll() {
- TEST_UTIL.startMiniCluster()
- logInfo(" - minicluster started")
-
- try {
- TEST_UTIL.deleteTable(TableName.valueOf(tableName))
- } catch {
- case e: Exception =>
- logInfo(" - no table " + tableName + " found")
- }
- logInfo(" - creating table " + tableName)
- TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
- logInfo(" - created table")
-
- val envMap = Map[String,String](("Xmx", "512m"))
-
- sc = new SparkContext("local", "test", null, Nil, envMap)
- }
-
- override def afterAll() {
- logInfo("shuting down minicluster")
- TEST_UTIL.shutdownMiniCluster()
- logInfo(" - minicluster shut down")
- TEST_UTIL.cleanupTestDir()
- sc.stop()
- }
-
- test("bulkput to test HBase client") {
- val config = TEST_UTIL.getConfiguration
- val rdd = sc.parallelize(Array(
- (Bytes.toBytes("1"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
- (Bytes.toBytes("2"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
- (Bytes.toBytes("3"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
- (Bytes.toBytes("4"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
- (Bytes.toBytes("5"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
-
- val hbaseContext = new HBaseContext(sc, config)
- hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
- TableName.valueOf(tableName),
- (putRecord) => {
- val put = new Put(putRecord._1)
- putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
- put
- })
-
- val connection = ConnectionFactory.createConnection(config)
- val table = connection.getTable(TableName.valueOf("t1"))
-
- try {
- val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
- assert(foo1 == "foo1")
-
- val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
- assert(foo2 == "foo2")
-
- val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
- assert(foo3 == "foo3")
-
- val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
- assert(foo4 == "foo")
-
- val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
- assert(foo5 == "bar")
-
- } finally {
- table.close()
- connection.close()
- }
- }
-
- test("bulkDelete to test HBase client") {
- val config = TEST_UTIL.getConfiguration
- val connection = ConnectionFactory.createConnection(config)
- val table = connection.getTable(TableName.valueOf("t1"))
-
- try {
- var put = new Put(Bytes.toBytes("delete1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- table.put(put)
- put = new Put(Bytes.toBytes("delete2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- table.put(put)
- put = new Put(Bytes.toBytes("delete3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- table.put(put)
-
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("delete1"),
- Bytes.toBytes("delete3")))
-
- val hbaseContext = new HBaseContext(sc, config)
- hbaseContext.bulkDelete[Array[Byte]](rdd,
- TableName.valueOf(tableName),
- putRecord => new Delete(putRecord),
- 4)
-
- assert(table.get(new Get(Bytes.toBytes("delete1"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
- assert(table.get(new Get(Bytes.toBytes("delete3"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
- assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))).
- getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2"))
- } finally {
- table.close()
- connection.close()
- }
- }
-
- test("bulkGet to test HBase client") {
- val config = TEST_UTIL.getConfiguration
- val connection = ConnectionFactory.createConnection(config)
- val table = connection.getTable(TableName.valueOf("t1"))
-
- try {
- var put = new Put(Bytes.toBytes("get1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- table.put(put)
- put = new Put(Bytes.toBytes("get2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- table.put(put)
- put = new Put(Bytes.toBytes("get3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- table.put(put)
- } finally {
- table.close()
- connection.close()
- }
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("get1"),
- Bytes.toBytes("get2"),
- Bytes.toBytes("get3"),
- Bytes.toBytes("get4")))
- val hbaseContext = new HBaseContext(sc, config)
-
- val getRdd = hbaseContext.bulkGet[Array[Byte], String](
- TableName.valueOf(tableName),
- 2,
- rdd,
- record => {
- new Get(record)
- },
- (result: Result) => {
- if (result.listCells() != null) {
- val it = result.listCells().iterator()
- val B = new StringBuilder
-
- B.append(Bytes.toString(result.getRow) + ":")
-
- while (it.hasNext) {
- val cell = it.next()
- val q = Bytes.toString(CellUtil.cloneQualifier(cell))
- if (q.equals("counter")) {
- B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
- } else {
- B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
- }
- }
- "" + B.toString
- } else {
- ""
- }
- })
- val getArray = getRdd.collect()
-
- assert(getArray.length == 4)
- assert(getArray.contains("get1:(a,foo1)"))
- assert(getArray.contains("get2:(a,foo2)"))
- assert(getArray.contains("get3:(a,foo3)"))
-
- }
-
- test("BulkGet failure test: bad table") {
- val config = TEST_UTIL.getConfiguration
-
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("get1"),
- Bytes.toBytes("get2"),
- Bytes.toBytes("get3"),
- Bytes.toBytes("get4")))
- val hbaseContext = new HBaseContext(sc, config)
-
- intercept[SparkException] {
- try {
- val getRdd = hbaseContext.bulkGet[Array[Byte], String](
- TableName.valueOf("badTableName"),
- 2,
- rdd,
- record => {
- new Get(record)
- },
- (result: Result) => "1")
-
- getRdd.collect()
-
- fail("We should have failed and not reached this line")
- } catch {
- case ex: SparkException => {
- assert(
- ex.getMessage.contains(
- "org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException"))
- throw ex
- }
- }
- }
- }
-
- test("BulkGet failure test: bad column") {
-
- val config = TEST_UTIL.getConfiguration
- val connection = ConnectionFactory.createConnection(config)
- val table = connection.getTable(TableName.valueOf("t1"))
-
- try {
- var put = new Put(Bytes.toBytes("get1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- table.put(put)
- put = new Put(Bytes.toBytes("get2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- table.put(put)
- put = new Put(Bytes.toBytes("get3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- table.put(put)
- } finally {
- table.close()
- connection.close()
- }
-
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("get1"),
- Bytes.toBytes("get2"),
- Bytes.toBytes("get3"),
- Bytes.toBytes("get4")))
- val hbaseContext = new HBaseContext(sc, config)
-
- val getRdd = hbaseContext.bulkGet[Array[Byte], String](
- TableName.valueOf(tableName),
- 2,
- rdd,
- record => {
- new Get(record)
- },
- (result: Result) => {
- if (result.listCells() != null) {
- val cellValue = result.getColumnLatestCell(
- Bytes.toBytes("c"), Bytes.toBytes("bad_column"))
- if (cellValue == null) "null" else "bad"
- } else "noValue"
- })
- var nullCounter = 0
- var noValueCounter = 0
- getRdd.collect().foreach(r => {
- if ("null".equals(r)) nullCounter += 1
- else if ("noValue".equals(r)) noValueCounter += 1
- })
- assert(nullCounter == 3)
- assert(noValueCounter == 1)
- }
-
- test("distributedScan to test HBase client") {
- val config = TEST_UTIL.getConfiguration
- val connection = ConnectionFactory.createConnection(config)
- val table = connection.getTable(TableName.valueOf("t1"))
-
- try {
- var put = new Put(Bytes.toBytes("scan1"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
- table.put(put)
- put = new Put(Bytes.toBytes("scan2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
- table.put(put)
- put = new Put(Bytes.toBytes("scan2"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo-2"))
- table.put(put)
- put = new Put(Bytes.toBytes("scan3"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- table.put(put)
- put = new Put(Bytes.toBytes("scan4"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- table.put(put)
- put = new Put(Bytes.toBytes("scan5"))
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
- table.put(put)
- } finally {
- table.close()
- connection.close()
- }
-
- val hbaseContext = new HBaseContext(sc, config)
-
- val scan = new Scan()
- val filter = new FirstKeyOnlyFilter()
- scan.setCaching(100)
- scan.setStartRow(Bytes.toBytes("scan2"))
- scan.setStopRow(Bytes.toBytes("scan4_"))
- scan.setFilter(filter)
-
- val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
-
- try {
- val scanList = scanRdd.map(r => r._1.copyBytes()).collect()
- assert(scanList.length == 3)
- var cnt = 0
- scanRdd.map(r => r._2.listCells().size()).collect().foreach(l => {
- cnt += l
- })
- // the number of cells returned would be 4 without the Filter
- assert(cnt == 3);
- } catch {
- case ex: Exception => ex.printStackTrace()
- }
- }
-}