You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:49 UTC

[3/9] hbase git commit: HBASE-18817 pull the hbase-spark module out of branch-2.

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
deleted file mode 100644
index a427327..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ /dev/null
@@ -1,956 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
-import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
-import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.spark.{SparkContext, Logging}
-import org.junit.rules.TemporaryFolder
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-class BulkLoadSuite extends FunSuite with
-BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
-  @transient var sc: SparkContext = null
-  var TEST_UTIL = new HBaseTestingUtility
-
-  val tableName = "t1"
-  val columnFamily1 = "f1"
-  val columnFamily2 = "f2"
-  val testFolder = new TemporaryFolder()
-
-
-  override def beforeAll() {
-    TEST_UTIL.startMiniCluster()
-    logInfo(" - minicluster started")
-
-    try {
-      TEST_UTIL.deleteTable(TableName.valueOf(tableName))
-    } catch {
-      case e: Exception =>
-        logInfo(" - no table " + tableName + " found")
-    }
-
-    logInfo(" - created table")
-
-    val envMap = Map[String,String](("Xmx", "512m"))
-
-    sc = new SparkContext("local", "test", null, Nil, envMap)
-  }
-
-  override def afterAll() {
-    logInfo("shuting down minicluster")
-    TEST_UTIL.shutdownMiniCluster()
-    logInfo(" - minicluster shut down")
-    TEST_UTIL.cleanupTestDir()
-    sc.stop()
-  }
-
-  test("Wide Row Bulk Load: Test multi family and multi column tests " +
-    "with all default HFile Configs.") {
-    val config = TEST_UTIL.getConfiguration
-
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName),
-      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
-
-    //There are a number of tests in here.
-    // 1. Row keys are not in order
-    // 2. Qualifiers are not in order
-    // 3. Column Families are not in order
-    // 4. There are tests for records in one column family and some in two column families
-    // 5. There are records will a single qualifier and some with two
-    val rdd = sc.parallelize(Array(
-      (Bytes.toBytes("1"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
-      (Bytes.toBytes("5"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
-      (Bytes.toBytes("4"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
-      (Bytes.toBytes("4"),
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
-      (Bytes.toBytes("2"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
-      (Bytes.toBytes("2"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
-
-
-
-    val hbaseContext = new HBaseContext(sc, config)
-
-    testFolder.create()
-    val stagingFolder = testFolder.newFolder()
-
-    hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd,
-      TableName.valueOf(tableName),
-      t => {
-        val rowKey = t._1
-        val family:Array[Byte] = t._2._1
-        val qualifier = t._2._2
-        val value:Array[Byte] = t._2._3
-
-        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
-
-        Seq((keyFamilyQualifier, value)).iterator
-      },
-      stagingFolder.getPath)
-
-    val fs = FileSystem.get(config)
-    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
-
-    val conn = ConnectionFactory.createConnection(config)
-
-    val load = new LoadIncrementalHFiles(config)
-    val table = conn.getTable(TableName.valueOf(tableName))
-    try {
-      load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
-        conn.getRegionLocator(TableName.valueOf(tableName)))
-
-      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
-      assert(cells5.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
-
-      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
-      assert(cells4.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
-
-      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
-      assert(cells3.size == 3)
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
-
-
-      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
-      assert(cells2.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
-
-      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
-      assert(cells1.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
-
-    } finally {
-      table.close()
-      val admin = ConnectionFactory.createConnection(config).getAdmin
-      try {
-        admin.disableTable(TableName.valueOf(tableName))
-        admin.deleteTable(TableName.valueOf(tableName))
-      } finally {
-        admin.close()
-      }
-      fs.delete(new Path(stagingFolder.getPath), true)
-
-      testFolder.delete()
-
-    }
-  }
-
-  test("Wide Row Bulk Load: Test HBase client: Test Roll Over and " +
-    "using an implicit call to bulk load") {
-    val config = TEST_UTIL.getConfiguration
-
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName),
-      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
-
-    //There are a number of tests in here.
-    // 1. Row keys are not in order
-    // 2. Qualifiers are not in order
-    // 3. Column Families are not in order
-    // 4. There are tests for records in one column family and some in two column families
-    // 5. There are records will a single qualifier and some with two
-    val rdd = sc.parallelize(Array(
-      (Bytes.toBytes("1"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))),
-      (Bytes.toBytes("5"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
-      (Bytes.toBytes("4"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
-      (Bytes.toBytes("4"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
-      (Bytes.toBytes("2"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
-      (Bytes.toBytes("2"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
-
-    val hbaseContext = new HBaseContext(sc, config)
-
-    testFolder.create()
-    val stagingFolder = testFolder.newFolder()
-
-    rdd.hbaseBulkLoad(hbaseContext,
-      TableName.valueOf(tableName),
-      t => {
-        val rowKey = t._1
-        val family:Array[Byte] = t._2._1
-        val qualifier = t._2._2
-        val value = t._2._3
-
-        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
-
-        Seq((keyFamilyQualifier, value)).iterator
-      },
-      stagingFolder.getPath,
-      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
-      compactionExclude = false,
-      20)
-
-    val fs = FileSystem.get(config)
-    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
-
-    assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
-
-    val conn = ConnectionFactory.createConnection(config)
-
-    val load = new LoadIncrementalHFiles(config)
-    val table = conn.getTable(TableName.valueOf(tableName))
-    try {
-      load.doBulkLoad(new Path(stagingFolder.getPath),
-        conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
-
-      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
-      assert(cells5.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
-
-      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
-      assert(cells4.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
-
-      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
-      assert(cells3.size == 3)
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
-
-      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
-      assert(cells2.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
-
-      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
-      assert(cells1.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
-
-    } finally {
-      table.close()
-      val admin = ConnectionFactory.createConnection(config).getAdmin
-      try {
-        admin.disableTable(TableName.valueOf(tableName))
-        admin.deleteTable(TableName.valueOf(tableName))
-      } finally {
-        admin.close()
-      }
-      fs.delete(new Path(stagingFolder.getPath), true)
-
-      testFolder.delete()
-    }
-  }
-
-  test("Wide Row Bulk Load: Test multi family and multi column tests" +
-    " with one column family with custom configs plus multi region") {
-    val config = TEST_UTIL.getConfiguration
-
-    val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
-    splitKeys(0) = Bytes.toBytes("2")
-    splitKeys(1) = Bytes.toBytes("4")
-
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName),
-      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
-      splitKeys)
-
-    //There are a number of tests in here.
-    // 1. Row keys are not in order
-    // 2. Qualifiers are not in order
-    // 3. Column Families are not in order
-    // 4. There are tests for records in one column family and some in two column families
-    // 5. There are records will a single qualifier and some with two
-    val rdd = sc.parallelize(Array(
-      (Bytes.toBytes("1"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
-      (Bytes.toBytes("3"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
-      (Bytes.toBytes("5"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
-      (Bytes.toBytes("4"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
-      (Bytes.toBytes("4"),
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
-      (Bytes.toBytes("2"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
-      (Bytes.toBytes("2"),
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
-
-    val hbaseContext = new HBaseContext(sc, config)
-
-    testFolder.create()
-    val stagingFolder = testFolder.newFolder()
-
-    val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
-
-    val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
-      "PREFIX")
-
-    familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
-
-    hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd,
-      TableName.valueOf(tableName),
-      t => {
-        val rowKey = t._1
-        val family:Array[Byte] = t._2._1
-        val qualifier = t._2._2
-        val value = t._2._3
-
-        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
-
-        Seq((keyFamilyQualifier, value)).iterator
-      },
-      stagingFolder.getPath,
-      familyHBaseWriterOptions,
-      compactionExclude = false,
-      HConstants.DEFAULT_MAX_FILE_SIZE)
-
-    val fs = FileSystem.get(config)
-    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
-
-    val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
-    for ( i <- 0 until f1FileList.length) {
-      val reader = HFile.createReader(fs, f1FileList(i).getPath,
-        new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("gz"))
-      assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
-    }
-
-    assert( 3 ==  f1FileList.length)
-
-    val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
-    for ( i <- 0 until f2FileList.length) {
-      val reader = HFile.createReader(fs, f2FileList(i).getPath,
-        new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("none"))
-      assert(reader.getDataBlockEncoding.name().equals("NONE"))
-    }
-
-    assert( 2 ==  f2FileList.length)
-
-
-    val conn = ConnectionFactory.createConnection(config)
-
-    val load = new LoadIncrementalHFiles(config)
-    val table = conn.getTable(TableName.valueOf(tableName))
-    try {
-      load.doBulkLoad(new Path(stagingFolder.getPath),
-        conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
-
-      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
-      assert(cells5.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
-
-      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
-      assert(cells4.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
-
-      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
-      assert(cells3.size == 3)
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
-
-
-      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
-      assert(cells2.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
-
-      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
-      assert(cells1.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
-
-    } finally {
-      table.close()
-      val admin = ConnectionFactory.createConnection(config).getAdmin
-      try {
-        admin.disableTable(TableName.valueOf(tableName))
-        admin.deleteTable(TableName.valueOf(tableName))
-      } finally {
-        admin.close()
-      }
-      fs.delete(new Path(stagingFolder.getPath), true)
-
-      testFolder.delete()
-
-    }
-  }
-
-  test("Test partitioner") {
-
-    var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3)
-    splitKeys(0) = Bytes.toBytes("")
-    splitKeys(1) = Bytes.toBytes("3")
-    splitKeys(2) = Bytes.toBytes("7")
-
-    var partitioner = new BulkLoadPartitioner(splitKeys)
-
-    assert(0 == partitioner.getPartition(Bytes.toBytes("")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
-    assert(1 == partitioner.getPartition(Bytes.toBytes("3")))
-    assert(1 == partitioner.getPartition(Bytes.toBytes("4")))
-    assert(1 == partitioner.getPartition(Bytes.toBytes("6")))
-    assert(2 == partitioner.getPartition(Bytes.toBytes("7")))
-    assert(2 == partitioner.getPartition(Bytes.toBytes("8")))
-
-
-    splitKeys = new Array[Array[Byte]](1)
-    splitKeys(0) = Bytes.toBytes("")
-
-    partitioner = new BulkLoadPartitioner(splitKeys)
-
-    assert(0 == partitioner.getPartition(Bytes.toBytes("")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("3")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("4")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("6")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("7")))
-
-    splitKeys = new Array[Array[Byte]](7)
-    splitKeys(0) = Bytes.toBytes("")
-    splitKeys(1) = Bytes.toBytes("02")
-    splitKeys(2) = Bytes.toBytes("04")
-    splitKeys(3) = Bytes.toBytes("06")
-    splitKeys(4) = Bytes.toBytes("08")
-    splitKeys(5) = Bytes.toBytes("10")
-    splitKeys(6) = Bytes.toBytes("12")
-
-    partitioner = new BulkLoadPartitioner(splitKeys)
-
-    assert(0 == partitioner.getPartition(Bytes.toBytes("")))
-    assert(0 == partitioner.getPartition(Bytes.toBytes("01")))
-    assert(1 == partitioner.getPartition(Bytes.toBytes("02")))
-    assert(1 == partitioner.getPartition(Bytes.toBytes("03")))
-    assert(2 == partitioner.getPartition(Bytes.toBytes("04")))
-    assert(2 == partitioner.getPartition(Bytes.toBytes("05")))
-    assert(3 == partitioner.getPartition(Bytes.toBytes("06")))
-    assert(3 == partitioner.getPartition(Bytes.toBytes("07")))
-    assert(4 == partitioner.getPartition(Bytes.toBytes("08")))
-    assert(4 == partitioner.getPartition(Bytes.toBytes("09")))
-    assert(5 == partitioner.getPartition(Bytes.toBytes("10")))
-    assert(5 == partitioner.getPartition(Bytes.toBytes("11")))
-    assert(6 == partitioner.getPartition(Bytes.toBytes("12")))
-    assert(6 == partitioner.getPartition(Bytes.toBytes("13")))
-  }
-
-  test("Thin Row Bulk Load: Test multi family and multi column tests " +
-    "with all default HFile Configs") {
-    val config = TEST_UTIL.getConfiguration
-
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName),
-      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
-
-    //There are a number of tests in here.
-    // 1. Row keys are not in order
-    // 2. Qualifiers are not in order
-    // 3. Column Families are not in order
-    // 4. There are tests for records in one column family and some in two column families
-    // 5. There are records will a single qualifier and some with two
-    val rdd = sc.parallelize(Array(
-      ("1",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
-      ("3",
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
-      ("3",
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
-      ("3",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
-      ("5",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
-      ("4",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
-      ("4",
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
-      ("2",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
-      ("2",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))).
-      groupByKey()
-
-    val hbaseContext = new HBaseContext(sc, config)
-
-    testFolder.create()
-    val stagingFolder = testFolder.newFolder()
-
-    hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
-      TableName.valueOf(tableName),
-      t => {
-        val rowKey = Bytes.toBytes(t._1)
-
-        val familyQualifiersValues = new FamiliesQualifiersValues
-        t._2.foreach(f => {
-          val family:Array[Byte] = f._1
-          val qualifier = f._2
-          val value:Array[Byte] = f._3
-
-          familyQualifiersValues +=(family, qualifier, value)
-        })
-        (new ByteArrayWrapper(rowKey), familyQualifiersValues)
-      },
-      stagingFolder.getPath)
-
-    val fs = FileSystem.get(config)
-    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
-
-    val conn = ConnectionFactory.createConnection(config)
-
-    val load = new LoadIncrementalHFiles(config)
-    val table = conn.getTable(TableName.valueOf(tableName))
-    try {
-      load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
-        conn.getRegionLocator(TableName.valueOf(tableName)))
-
-      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
-      assert(cells5.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
-
-      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
-      assert(cells4.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
-
-      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
-      assert(cells3.size == 3)
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
-
-
-      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
-      assert(cells2.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
-
-      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
-      assert(cells1.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
-
-    } finally {
-      table.close()
-      val admin = ConnectionFactory.createConnection(config).getAdmin
-      try {
-        admin.disableTable(TableName.valueOf(tableName))
-        admin.deleteTable(TableName.valueOf(tableName))
-      } finally {
-        admin.close()
-      }
-      fs.delete(new Path(stagingFolder.getPath), true)
-
-      testFolder.delete()
-
-    }
-  }
-
-  test("Thin Row Bulk Load: Test HBase client: Test Roll Over and " +
-    "using an implicit call to bulk load") {
-    val config = TEST_UTIL.getConfiguration
-
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName),
-      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
-
-    //There are a number of tests in here.
-    // 1. Row keys are not in order
-    // 2. Qualifiers are not in order
-    // 3. Column Families are not in order
-    // 4. There are tests for records in one column family and some in two column families
-    // 5. There are records will a single qualifier and some with two
-    val rdd = sc.parallelize(Array(
-      ("1",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
-      ("3",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))),
-      ("3",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))),
-      ("3",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))),
-      ("5",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
-      ("4",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
-      ("4",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
-      ("2",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
-      ("2",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))).
-      groupByKey()
-
-    val hbaseContext = new HBaseContext(sc, config)
-
-    testFolder.create()
-    val stagingFolder = testFolder.newFolder()
-
-    rdd.hbaseBulkLoadThinRows(hbaseContext,
-      TableName.valueOf(tableName),
-      t => {
-        val rowKey = t._1
-
-        val familyQualifiersValues = new FamiliesQualifiersValues
-        t._2.foreach(f => {
-          val family:Array[Byte] = f._1
-          val qualifier = f._2
-          val value:Array[Byte] = f._3
-
-          familyQualifiersValues +=(family, qualifier, value)
-        })
-        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
-      },
-      stagingFolder.getPath,
-      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
-      compactionExclude = false,
-      20)
-
-    val fs = FileSystem.get(config)
-    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
-
-    assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
-
-    val conn = ConnectionFactory.createConnection(config)
-
-    val load = new LoadIncrementalHFiles(config)
-    val table = conn.getTable(TableName.valueOf(tableName))
-    try {
-      load.doBulkLoad(new Path(stagingFolder.getPath),
-        conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
-
-      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
-      assert(cells5.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
-
-      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
-      assert(cells4.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
-
-      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
-      assert(cells3.size == 3)
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
-
-      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
-      assert(cells2.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
-
-      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
-      assert(cells1.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
-
-    } finally {
-      table.close()
-      val admin = ConnectionFactory.createConnection(config).getAdmin
-      try {
-        admin.disableTable(TableName.valueOf(tableName))
-        admin.deleteTable(TableName.valueOf(tableName))
-      } finally {
-        admin.close()
-      }
-      fs.delete(new Path(stagingFolder.getPath), true)
-
-      testFolder.delete()
-    }
-  }
-
-  test("Thin Row Bulk Load: Test multi family and multi column tests" +
-    " with one column family with custom configs plus multi region") {
-    val config = TEST_UTIL.getConfiguration
-
-    val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
-    splitKeys(0) = Bytes.toBytes("2")
-    splitKeys(1) = Bytes.toBytes("4")
-
-    logInfo(" - creating table " + tableName)
-    TEST_UTIL.createTable(TableName.valueOf(tableName),
-      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
-      splitKeys)
-
-    //There are a number of tests in here.
-    // 1. Row keys are not in order
-    // 2. Qualifiers are not in order
-    // 3. Column Families are not in order
-    // 4. There are tests for records in one column family and some in two column families
-    // 5. There are records will a single qualifier and some with two
-    val rdd = sc.parallelize(Array(
-      ("1",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
-      ("3",
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
-      ("3",
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
-      ("3",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
-      ("5",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
-      ("4",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
-      ("4",
-        (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
-      ("2",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
-      ("2",
-        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))).
-      groupByKey()
-
-    val hbaseContext = new HBaseContext(sc, config)
-
-    testFolder.create()
-    val stagingFolder = testFolder.newFolder()
-
-    val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
-
-    val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
-      "PREFIX")
-
-    familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
-
-    hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
-      TableName.valueOf(tableName),
-      t => {
-        val rowKey = t._1
-
-        val familyQualifiersValues = new FamiliesQualifiersValues
-        t._2.foreach(f => {
-          val family:Array[Byte] = f._1
-          val qualifier = f._2
-          val value:Array[Byte] = f._3
-
-          familyQualifiersValues +=(family, qualifier, value)
-        })
-        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
-      },
-      stagingFolder.getPath,
-      familyHBaseWriterOptions,
-      compactionExclude = false,
-      HConstants.DEFAULT_MAX_FILE_SIZE)
-
-    val fs = FileSystem.get(config)
-    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
-
-    val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
-    for ( i <- 0 until f1FileList.length) {
-      val reader = HFile.createReader(fs, f1FileList(i).getPath,
-        new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("gz"))
-      assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
-    }
-
-    assert( 3 ==  f1FileList.length)
-
-    val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
-    for ( i <- 0 until f2FileList.length) {
-      val reader = HFile.createReader(fs, f2FileList(i).getPath,
-        new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("none"))
-      assert(reader.getDataBlockEncoding.name().equals("NONE"))
-    }
-
-    assert( 2 ==  f2FileList.length)
-
-
-    val conn = ConnectionFactory.createConnection(config)
-
-    val load = new LoadIncrementalHFiles(config)
-    val table = conn.getTable(TableName.valueOf(tableName))
-    try {
-      load.doBulkLoad(new Path(stagingFolder.getPath),
-        conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
-
-      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
-      assert(cells5.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
-
-      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
-      assert(cells4.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
-
-      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
-      assert(cells3.size == 3)
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
-
-
-      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
-      assert(cells2.size == 2)
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
-      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
-
-      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
-      assert(cells1.size == 1)
-      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
-      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
-      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
-
-    } finally {
-      table.close()
-      val admin = ConnectionFactory.createConnection(config).getAdmin
-      try {
-        admin.disableTable(TableName.valueOf(tableName))
-        admin.deleteTable(TableName.valueOf(tableName))
-      } finally {
-        admin.close()
-      }
-      fs.delete(new Path(stagingFolder.getPath), true)
-
-      testFolder.delete()
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
deleted file mode 100644
index 3bce041..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ /dev/null
@@ -1,1040 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
-import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
-import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{Logging, SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-
-case class HBaseRecord(
-  col0: String,
-  col1: Boolean,
-  col2: Double,
-  col3: Float,
-  col4: Int,
-  col5: Long,
-  col6: Short,
-  col7: String,
-  col8: Byte)
-
-object HBaseRecord {
-  def apply(i: Int, t: String): HBaseRecord = {
-    val s = s"""row${"%03d".format(i)}"""
-    HBaseRecord(s,
-      i % 2 == 0,
-      i.toDouble,
-      i.toFloat,
-      i,
-      i.toLong,
-      i.toShort,
-      s"String$i: $t",
-      i.toByte)
-  }
-}
-
-
-case class AvroHBaseKeyRecord(col0: Array[Byte],
-                              col1: Array[Byte])
-
-object AvroHBaseKeyRecord {
-  val schemaString =
-    s"""{"namespace": "example.avro",
-        |   "type": "record",      "name": "User",
-        |    "fields": [      {"name": "name", "type": "string"},
-        |      {"name": "favorite_number",  "type": ["int", "null"]},
-        |        {"name": "favorite_color", "type": ["string", "null"]}      ]    }""".stripMargin
-
-  val avroSchema: Schema = {
-    val p = new Schema.Parser
-    p.parse(schemaString)
-  }
-
-  def apply(i: Int): AvroHBaseKeyRecord = {
-    val user = new GenericData.Record(avroSchema);
-    user.put("name", s"name${"%03d".format(i)}")
-    user.put("favorite_number", i)
-    user.put("favorite_color", s"color${"%03d".format(i)}")
-    val avroByte = AvroSerdes.serialize(user, avroSchema)
-    AvroHBaseKeyRecord(avroByte, avroByte)
-  }
-}
-
-class DefaultSourceSuite extends FunSuite with
-BeforeAndAfterEach with BeforeAndAfterAll with Logging {
-  @transient var sc: SparkContext = null
-  var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
-
-  val t1TableName = "t1"
-  val t2TableName = "t2"
-  val columnFamily = "c"
-
-  var sqlContext:SQLContext = null
-  var df:DataFrame = null
-
-  override def beforeAll() {
-
-    TEST_UTIL.startMiniCluster
-
-    logInfo(" - minicluster started")
-    try
-      TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
-    catch {
-      case e: Exception => logInfo(" - no table " + t1TableName + " found")
-    }
-    try
-      TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
-    catch {
-      case e: Exception => logInfo(" - no table " + t2TableName + " found")
-    }
-    logInfo(" - creating table " + t1TableName)
-    TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
-    logInfo(" - created table")
-    logInfo(" - creating table " + t2TableName)
-    TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
-    logInfo(" - created table")
-    val sparkConf = new SparkConf
-    sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
-    sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
-    sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
-
-    sc  = new SparkContext("local", "test", sparkConf)
-
-    val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
-    try {
-      val t1Table = connection.getTable(TableName.valueOf("t1"))
-
-      try {
-        var put = new Put(Bytes.toBytes("get1"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
-        t1Table.put(put)
-        put = new Put(Bytes.toBytes("get2"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
-        t1Table.put(put)
-        put = new Put(Bytes.toBytes("get3"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
-        t1Table.put(put)
-        put = new Put(Bytes.toBytes("get4"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
-        t1Table.put(put)
-        put = new Put(Bytes.toBytes("get5"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
-        t1Table.put(put)
-      } finally {
-        t1Table.close()
-      }
-
-      val t2Table = connection.getTable(TableName.valueOf("t2"))
-
-      try {
-        var put = new Put(Bytes.toBytes(1))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
-        t2Table.put(put)
-        put = new Put(Bytes.toBytes(2))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
-        t2Table.put(put)
-        put = new Put(Bytes.toBytes(3))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
-        t2Table.put(put)
-        put = new Put(Bytes.toBytes(4))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
-        t2Table.put(put)
-        put = new Put(Bytes.toBytes(5))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
-        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
-        t2Table.put(put)
-      } finally {
-        t2Table.close()
-      }
-    } finally {
-      connection.close()
-    }
-
-    def hbaseTable1Catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
-            |}
-          |}""".stripMargin
-
-    new HBaseContext(sc, TEST_UTIL.getConfiguration)
-    sqlContext = new SQLContext(sc)
-
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->hbaseTable1Catalog))
-
-    df.registerTempTable("hbaseTable1")
-
-    def hbaseTable2Catalog = s"""{
-            |"table":{"namespace":"default", "name":"t2"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"int"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
-            |}
-          |}""".stripMargin
-
-
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->hbaseTable2Catalog))
-
-    df.registerTempTable("hbaseTable2")
-  }
-
-  override def afterAll() {
-    TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
-    logInfo("shuting down minicluster")
-    TEST_UTIL.shutdownMiniCluster()
-
-    sc.stop()
-  }
-
-  override def beforeEach(): Unit = {
-    DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
-  }
-
-
-  /**
-   * A example of query three fields and also only using rowkey points for the filter
-   */
-  test("Test rowKey point only rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 3)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( KEY_FIELD == 0 OR KEY_FIELD == 1 ) OR KEY_FIELD == 2 )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 3)
-    assert(executionRules.rowKeyFilter.ranges.size == 0)
-  }
-
-  /**
-   * A example of query three fields and also only using cell points for the filter
-   */
-  test("Test cell point only rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 3)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( B_FIELD == 0 OR B_FIELD == 1 ) OR A_FIELD == 2 )"))
-  }
-
-  /**
-   * A example of a OR merge between to ranges the result is one range
-   * Also an example of less then and greater then
-   */
-  test("Test two range rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 3)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 2)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
-    assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2")))
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(!scanRange1.isUpperBoundEqualTo)
-
-    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
-    assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3")))
-    assert(scanRange2.upperBound == null)
-    assert(!scanRange2.isLowerBoundEqualTo)
-    assert(scanRange2.isUpperBoundEqualTo)
-  }
-
-  /**
-   * A example of a OR merge between to ranges the result is one range
-   * Also an example of less then and greater then
-   *
-   * This example makes sure the code works for a int rowKey
-   */
-  test("Test two range rowKey query where the rowKey is Int and there is a range over lap") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
-      "WHERE " +
-      "( KEY_FIELD < 4 or KEY_FIELD > 2)").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 2)
-    assert(results.length == 5)
-  }
-
-  /**
-   * A example of a OR merge between to ranges the result is two ranges
-   * Also an example of less then and greater then
-   *
-   * This example makes sure the code works for a int rowKey
-   */
-  test("Test two range rowKey query where the rowKey is Int and the ranges don't over lap") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
-      "WHERE " +
-      "( KEY_FIELD < 2 or KEY_FIELD > 4)").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-
-    assert(executionRules.rowKeyFilter.ranges.size == 3)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(!scanRange1.isUpperBoundEqualTo)
-
-    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
-    assert(scanRange2.isUpperBoundEqualTo)
-
-    assert(results.length == 2)
-  }
-
-  /**
-   * A example of a AND merge between to ranges the result is one range
-   * Also an example of less then and equal to and greater then and equal to
-   */
-  test("Test one combined range rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 2)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( KEY_FIELD <= 0 AND KEY_FIELD >= 1 )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 1)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get2")))
-    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-
-  }
-
-  /**
-   * Do a select with no filters
-   */
-  test("Test select only query") {
-
-    val results = df.select("KEY_FIELD").take(10)
-    assert(results.length == 5)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(executionRules.dynamicLogicExpression == null)
-
-  }
-
-  /**
-   * A complex query with one point and one range for both the
-   * rowKey and the a column
-   */
-  test("Test SQL point and range combo") {
-    val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
-      "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( KEY_FIELD == 0 AND B_FIELD < 1 ) OR " +
-      "( KEY_FIELD >= 2 AND B_FIELD == 3 ) )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 1)
-    assert(executionRules.rowKeyFilter.ranges.size == 1)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get3")))
-    assert(scanRange1.upperBound == null)
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-
-
-    assert(results.length == 3)
-  }
-
-  /**
-   * A complex query with two complex ranges that doesn't merge into one
-   */
-  test("Test two complete range non merge rowKey query") {
-
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
-      "WHERE " +
-      "( KEY_FIELD >= 1 and KEY_FIELD <= 2) or" +
-      "( KEY_FIELD > 3 and KEY_FIELD <= 5)").take(10)
-
-
-    assert(results.length == 4)
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
-      "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 2)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(1)))
-    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-
-    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
-    assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3)))
-    assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5)))
-    assert(!scanRange2.isLowerBoundEqualTo)
-    assert(scanRange2.isUpperBoundEqualTo)
-
-  }
-
-  /**
-   * A complex query with two complex ranges that does merge into one
-   */
-  test("Test two complete range merge rowKey query") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
-      "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 4)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
-      "( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 2)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
-    assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get2")))
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-
-    val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
-    assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3")))
-    assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5")))
-    assert(!scanRange2.isLowerBoundEqualTo)
-    assert(scanRange2.isUpperBoundEqualTo)
-  }
-
-  test("Test OR logic with a one RowKey and One column") {
-
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "( KEY_FIELD >= 'get1' or A_FIELD <= 'foo2') or" +
-      "( KEY_FIELD > 'get3' or B_FIELD <= '4')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 5)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( KEY_FIELD >= 0 OR A_FIELD <= 1 ) OR " +
-      "( KEY_FIELD > 2 OR B_FIELD <= 3 ) )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 1)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    //This is the main test for 14406
-    //Because the key is joined through a or with a qualifier
-    //There is no filter on the rowKey
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
-    assert(scanRange1.upperBound == null)
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-  }
-
-  test("Test OR logic with a two columns") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "( B_FIELD > '4' or A_FIELD <= 'foo2') or" +
-      "( A_FIELD > 'foo2' or B_FIELD < '4')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 5)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( ( B_FIELD > 0 OR A_FIELD <= 1 ) OR " +
-      "( A_FIELD > 2 OR B_FIELD < 3 ) )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 1)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
-    assert(scanRange1.upperBound == null)
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-
-  }
-
-  test("Test single RowKey Or Column logic") {
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
-      "WHERE " +
-      "( KEY_FIELD >= 'get4' or A_FIELD <= 'foo2' )").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 4)
-
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( KEY_FIELD >= 0 OR A_FIELD <= 1 )"))
-
-    assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 1)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
-    assert(scanRange1.upperBound == null)
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-  }
-
-  test("Test table that doesn't exist") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1NotThere"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"c", "type":"string"}
-            |}
-          |}""".stripMargin
-
-    intercept[Exception] {
-      df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map(HBaseTableCatalog.tableCatalog->catalog))
-
-      df.registerTempTable("hbaseNonExistingTmp")
-
-      sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNonExistingTmp " +
-        "WHERE " +
-        "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
-        "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
-    }
-    DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-  }
-
-
-  test("Test table with column that doesn't exist") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
-              |"C_FIELD":{"cf":"c", "col":"c", "type":"string"}
-            |}
-          |}""".stripMargin
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->catalog))
-
-    df.registerTempTable("hbaseFactColumnTmp")
-
-    val result = sqlContext.sql("SELECT KEY_FIELD, " +
-      "B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
-
-    assert(result.count() == 5)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-    assert(executionRules.dynamicLogicExpression == null)
-
-  }
-
-  test("Test table with INT column") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
-              |"I_FIELD":{"cf":"c", "col":"i", "type":"int"}
-            |}
-          |}""".stripMargin
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->catalog))
-
-    df.registerTempTable("hbaseIntTmp")
-
-    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
-    " where I_FIELD > 4 and I_FIELD < 10")
-
-    val localResult = result.take(5)
-
-    assert(localResult.length == 2)
-    assert(localResult(0).getInt(2) == 8)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-    assert(executionRules.dynamicLogicExpression.toExpressionString.
-      equals("( I_FIELD > 0 AND I_FIELD < 1 )"))
-
-  }
-
-  test("Test table with INT column defined at wrong type") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
-              |"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
-            |}
-          |}""".stripMargin
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->catalog))
-
-    df.registerTempTable("hbaseIntWrongTypeTmp")
-
-    val result = sqlContext.sql("SELECT KEY_FIELD, " +
-      "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
-
-    val localResult = result.take(10)
-    assert(localResult.length == 5)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-    assert(executionRules.dynamicLogicExpression == null)
-
-    assert(localResult(0).getString(2).length == 4)
-    assert(localResult(0).getString(2).charAt(0).toByte == 0)
-    assert(localResult(0).getString(2).charAt(1).toByte == 0)
-    assert(localResult(0).getString(2).charAt(2).toByte == 0)
-    assert(localResult(0).getString(2).charAt(3).toByte == 1)
-  }
-
-  test("Test bad column type") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"FOOBAR"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
-            |}
-          |}""".stripMargin
-    intercept[Exception] {
-      df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map(HBaseTableCatalog.tableCatalog->catalog))
-
-      df.registerTempTable("hbaseIntWrongTypeTmp")
-
-      val result = sqlContext.sql("SELECT KEY_FIELD, " +
-        "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
-
-      val localResult = result.take(10)
-      assert(localResult.length == 5)
-
-      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-      assert(executionRules.dynamicLogicExpression == null)
-
-    }
-  }
-
-  test("Test HBaseSparkConf matching") {
-    val df = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
-      Map("cacheSize" -> "100",
-        "batchNum" -> "100",
-        "blockCacheingEnable" -> "true", "rowNum" -> "10"))
-    assert(df.count() == 10)
-
-    val df1 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
-      Map("cacheSize" -> "1000",
-        "batchNum" -> "100", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
-    intercept[Exception] {
-      assert(df1.count() == 10)
-    }
-
-    val df2 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
-      Map("cacheSize" -> "100",
-        "batchNum" -> "1000", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
-    intercept[Exception] {
-      assert(df2.count() == 10)
-    }
-
-    val df3 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
-      Map("cacheSize" -> "100",
-        "batchNum" -> "100", "blockCacheingEnable" -> "false", "rowNum" -> "10"))
-    intercept[Exception] {
-      assert(df3.count() == 10)
-    }
-  }
-
-  test("Test table with sparse column") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
-              |"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
-            |}
-          |}""".stripMargin
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->catalog))
-
-    df.registerTempTable("hbaseZTmp")
-
-    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
-
-    val localResult = result.take(10)
-    assert(localResult.length == 5)
-
-    assert(localResult(0).getString(2) == null)
-    assert(localResult(1).getString(2) == "FOO")
-    assert(localResult(2).getString(2) == null)
-    assert(localResult(3).getString(2) == "BAR")
-    assert(localResult(4).getString(2) == null)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-    assert(executionRules.dynamicLogicExpression == null)
-  }
-
-  test("Test with column logic disabled") {
-    val catalog = s"""{
-            |"table":{"namespace":"default", "name":"t1"},
-            |"rowkey":"key",
-            |"columns":{
-              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
-              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
-              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
-              |"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
-            |}
-          |}""".stripMargin
-    df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map(HBaseTableCatalog.tableCatalog->catalog,
-        HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
-
-    df.registerTempTable("hbaseNoPushDownTmp")
-
-    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNoPushDownTmp " +
-      "WHERE " +
-      "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
-
-    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-
-    assert(results.length == 2)
-
-    assert(executionRules.dynamicLogicExpression == null)
-  }
-
-  def writeCatalog = s"""{
-                    |"table":{"namespace":"default", "name":"table1"},
-                    |"rowkey":"key",
-                    |"columns":{
-                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
-                    |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
-                    |"col2":{"cf":"cf1", "col":"col2", "type":"double"},
-                    |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
-                    |"col4":{"cf":"cf3", "col":"col4", "type":"int"},
-                    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
-                    |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
-                    |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
-                    |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
-                    |}
-                    |}""".stripMargin
-
-  def withCatalog(cat: String): DataFrame = {
-    sqlContext
-      .read
-      .options(Map(HBaseTableCatalog.tableCatalog->cat))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-  }
-
-  test("populate table") {
-    val sql = sqlContext
-    import sql.implicits._
-    val data = (0 to 255).map { i =>
-      HBaseRecord(i, "extra")
-    }
-    sc.parallelize(data).toDF.write.options(
-      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.newTable -> "5"))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-  }
-
-  test("empty column") {
-    val df = withCatalog(writeCatalog)
-    df.registerTempTable("table0")
-    val c = sqlContext.sql("select count(1) from table0").rdd.collect()(0)(0).asInstanceOf[Long]
-    assert(c == 256)
-  }
-
-  test("full query") {
-    val df = withCatalog(writeCatalog)
-    df.show()
-    assert(df.count() == 256)
-  }
-
-  test("filtered query0") {
-    val sql = sqlContext
-    import sql.implicits._
-    val df = withCatalog(writeCatalog)
-    val s = df.filter($"col0" <= "row005")
-      .select("col0", "col1")
-    s.show()
-    assert(s.count() == 6)
-  }
-
-  test("Timestamp semantics") {
-    val sql = sqlContext
-    import sql.implicits._
-
-    // There's already some data in here from recently. Let's throw something in
-    // from 1993 which we can include/exclude and add some data with the implicit (now) timestamp.
-    // Then we should be able to cross-section it and only get points in between, get the most recent view
-    // and get an old view.
-    val oldMs = 754869600000L
-    val startMs = System.currentTimeMillis()
-    val oldData = (0 to 100).map { i =>
-      HBaseRecord(i, "old")
-    }
-    val newData = (200 to 255).map { i =>
-      HBaseRecord(i, "new")
-    }
-
-    sc.parallelize(oldData).toDF.write.options(
-      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5",
-        HBaseSparkConf.TIMESTAMP -> oldMs.toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-    sc.parallelize(newData).toDF.write.options(
-      Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5"))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-
-    // Test specific timestamp -- Full scan, Timestamp
-    val individualTimestamp = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(individualTimestamp.count() == 101)
-
-    // Test getting everything -- Full Scan, No range
-    val everything = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(everything.count() == 256)
-    // Test getting everything -- Pruned Scan, TimeRange
-    val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
-    assert(element50 == "String50: extra")
-    val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
-    assert(element200 == "String200: new")
-
-    // Test Getting old stuff -- Full Scan, TimeRange
-    val oldRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
-        HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(oldRange.count() == 101)
-    // Test Getting old stuff -- Pruned Scan, TimeRange
-    val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
-    assert(oldElement50 == "String50: old")
-
-    // Test Getting middle stuff -- Full Scan, TimeRange
-    val middleRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
-        HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-    assert(middleRange.count() == 256)
-    // Test Getting middle stuff -- Pruned Scan, TimeRange
-    val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
-    assert(middleElement200 == "String200: extra")
-  }
-
-
-  // catalog for insertion
-  def avroWriteCatalog = s"""{
-                             |"table":{"namespace":"default", "name":"avrotable"},
-                             |"rowkey":"key",
-                             |"columns":{
-                             |"col0":{"cf":"rowkey", "col":"key", "type":"binary"},
-                             |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
-                             |}
-                             |}""".stripMargin
-
-  // catalog for read
-  def avroCatalog = s"""{
-                        |"table":{"namespace":"default", "name":"avrotable"},
-                        |"rowkey":"key",
-                        |"columns":{
-                        |"col0":{"cf":"rowkey", "col":"key",  "avro":"avroSchema"},
-                        |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
-                        |}
-                        |}""".stripMargin
-
-  // for insert to another table
-  def avroCatalogInsert = s"""{
-                              |"table":{"namespace":"default", "name":"avrotableInsert"},
-                              |"rowkey":"key",
-                              |"columns":{
-                              |"col0":{"cf":"rowkey", "col":"key", "avro":"avroSchema"},
-                              |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
-                              |}
-                              |}""".stripMargin
-
-  def withAvroCatalog(cat: String): DataFrame = {
-    sqlContext
-      .read
-      .options(Map("avroSchema"->AvroHBaseKeyRecord.schemaString,
-        HBaseTableCatalog.tableCatalog->avroCatalog))
-      .format("org.apache.hadoop.hbase.spark")
-      .load()
-  }
-
-
-  test("populate avro table") {
-    val sql = sqlContext
-    import sql.implicits._
-
-    val data = (0 to 255).map { i =>
-      AvroHBaseKeyRecord(i)
-    }
-    sc.parallelize(data).toDF.write.options(
-      Map(HBaseTableCatalog.tableCatalog -> avroWriteCatalog,
-        HBaseTableCatalog.newTable -> "5"))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-  }
-
-  test("avro empty column") {
-    val df = withAvroCatalog(avroCatalog)
-    df.registerTempTable("avrotable")
-    val c = sqlContext.sql("select count(1) from avrotable")
-      .rdd.collect()(0)(0).asInstanceOf[Long]
-    assert(c == 256)
-  }
-
-  test("avro full query") {
-    val df = withAvroCatalog(avroCatalog)
-    df.show()
-    df.printSchema()
-    assert(df.count() == 256)
-  }
-
-  test("avro serialization and deserialization query") {
-    val df = withAvroCatalog(avroCatalog)
-    df.write.options(
-      Map("avroSchema"->AvroHBaseKeyRecord.schemaString,
-        HBaseTableCatalog.tableCatalog->avroCatalogInsert,
-        HBaseTableCatalog.newTable -> "5"))
-      .format("org.apache.hadoop.hbase.spark")
-      .save()
-    val newDF = withAvroCatalog(avroCatalogInsert)
-    newDF.show()
-    newDF.printSchema()
-    assert(newDF.count() == 256)
-  }
-
-  test("avro filtered query") {
-    val sql = sqlContext
-    import sql.implicits._
-    val df = withAvroCatalog(avroCatalog)
-    val r = df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
-      .select("col0", "col1.favorite_color", "col1.favorite_number")
-    r.show()
-    assert(r.count() == 6)
-  }
-
-  test("avro Or filter") {
-    val sql = sqlContext
-    import sql.implicits._
-    val df = withAvroCatalog(avroCatalog)
-    val s = df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
-      .select("col0", "col1.favorite_color", "col1.favorite_number")
-    s.show()
-    assert(s.count() == 7)
-  }
-}