You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:54 UTC

[8/9] hbase git commit: HBASE-18817 pull the hbase-spark module out of branch-2.

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
deleted file mode 100644
index 54ff658..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.spark.FamilyHFileWriteOptions;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.spark.KeyFamilyQualifier;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Run this example using command below:
- *
- *  SPARK_HOME/bin/spark-submit --master local[2] --class org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkLoadExample
- *  path/to/hbase-spark.jar {path/to/output/HFiles}
- *
- * This example will output put hfiles in {path/to/output/HFiles}, and user can run
- * 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
- */
-final public class JavaHBaseBulkLoadExample {
-  private JavaHBaseBulkLoadExample() {}
-
-  public static void main(String[] args) {
-    if (args.length < 1) {
-      System.out.println("JavaHBaseBulkLoadExample  " + "{outputPath}");
-      return;
-    }
-
-    String tableName = "bulkload-table-test";
-    String columnFamily1 = "f1";
-    String columnFamily2 = "f2";
-
-    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName);
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
-    try {
-      List<String> list= new ArrayList<String>();
-      // row1
-      list.add("1," + columnFamily1 + ",b,1");
-      // row3
-      list.add("3," + columnFamily1 + ",a,2");
-      list.add("3," + columnFamily1 + ",b,1");
-      list.add("3," + columnFamily2 + ",a,1");
-      /* row2 */
-      list.add("2," + columnFamily2 + ",a,3");
-      list.add("2," + columnFamily2 + ",b,3");
-
-      JavaRDD<String> rdd = jsc.parallelize(list);
-
-      Configuration conf = HBaseConfiguration.create();
-      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-
-
-      hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), args[0],
-          new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
-    } finally {
-      jsc.stop();
-    }
-  }
-
-  public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
-
-    @Override
-    public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
-      if (v1 == null)
-        return null;
-      String[] strs = v1.split(",");
-      if(strs.length != 4)
-        return null;
-      KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
-          Bytes.toBytes(strs[2]));
-      return new Pair(kfq, Bytes.toBytes(strs[3]));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
deleted file mode 100644
index 5821c19..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-/**
- * This is a simple example of putting records in HBase
- * with the bulkPut function.
- */
-final public class JavaHBaseBulkPutExample {
-
-  private JavaHBaseBulkPutExample() {}
-
-  public static void main(String[] args) {
-    if (args.length < 2) {
-      System.out.println("JavaHBaseBulkPutExample  " +
-              "{tableName} {columnFamily}");
-      return;
-    }
-
-    String tableName = args[0];
-    String columnFamily = args[1];
-
-    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
-    try {
-      List<String> list = new ArrayList<>(5);
-      list.add("1," + columnFamily + ",a,1");
-      list.add("2," + columnFamily + ",a,2");
-      list.add("3," + columnFamily + ",a,3");
-      list.add("4," + columnFamily + ",a,4");
-      list.add("5," + columnFamily + ",a,5");
-
-      JavaRDD<String> rdd = jsc.parallelize(list);
-
-      Configuration conf = HBaseConfiguration.create();
-
-      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-      hbaseContext.bulkPut(rdd,
-              TableName.valueOf(tableName),
-              new PutFunction());
-    } finally {
-      jsc.stop();
-    }
-  }
-
-  public static class PutFunction implements Function<String, Put> {
-
-    private static final long serialVersionUID = 1L;
-
-    public Put call(String v) throws Exception {
-      String[] cells = v.split(",");
-      Put put = new Put(Bytes.toBytes(cells[0]));
-
-      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
-              Bytes.toBytes(cells[3]));
-      return put;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
deleted file mode 100644
index 8d4c092..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import org.apache.spark.api.java.function.Function;
-import scala.Tuple2;
-
-/**
- * This is a simple example of scanning records from HBase
- * with the hbaseRDD function.
- */
-final public class JavaHBaseDistributedScan {
-
-  private JavaHBaseDistributedScan() {}
-
-  public static void main(String[] args) {
-    if (args.length < 1) {
-      System.out.println("JavaHBaseDistributedScan {tableName}");
-      return;
-    }
-
-    String tableName = args[0];
-
-    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
-    try {
-      Configuration conf = HBaseConfiguration.create();
-
-      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-      Scan scan = new Scan();
-      scan.setCaching(100);
-
-      JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
-              hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
-
-      List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
-
-      System.out.println("Result Size: " + results.size());
-    } finally {
-      jsc.stop();
-    }
-  }
-
-  private static class ScanConvertFunction implements
-          Function<Tuple2<ImmutableBytesWritable, Result>, String> {
-    @Override
-    public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
-      return Bytes.toString(v1._1().copyBytes());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
deleted file mode 100644
index 316f8a1..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-
-import scala.Tuple2;
-
-/**
- * This is a simple example of using the foreachPartition
- * method with a HBase connection
- */
-final public class JavaHBaseMapGetPutExample {
-
-  private JavaHBaseMapGetPutExample() {}
-
-  public static void main(String[] args) {
-    if (args.length < 1) {
-      System.out.println("JavaHBaseBulkGetExample {tableName}");
-      return;
-    }
-
-    final String tableName = args[0];
-
-    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
-    try {
-      List<byte[]> list = new ArrayList<>(5);
-      list.add(Bytes.toBytes("1"));
-      list.add(Bytes.toBytes("2"));
-      list.add(Bytes.toBytes("3"));
-      list.add(Bytes.toBytes("4"));
-      list.add(Bytes.toBytes("5"));
-
-      JavaRDD<byte[]> rdd = jsc.parallelize(list);
-      Configuration conf = HBaseConfiguration.create();
-
-      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-      hbaseContext.foreachPartition(rdd,
-              new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
-        public void call(Tuple2<Iterator<byte[]>, Connection> t)
-                throws Exception {
-          Table table = t._2().getTable(TableName.valueOf(tableName));
-          BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
-
-          while (t._1().hasNext()) {
-            byte[] b = t._1().next();
-            Result r = table.get(new Get(b));
-            if (r.getExists()) {
-              mutator.mutate(new Put(b));
-            }
-          }
-
-          mutator.flush();
-          mutator.close();
-          table.close();
-        }
-      });
-    } finally {
-      jsc.stop();
-    }
-  }
-
-  public static class GetFunction implements Function<byte[], Get> {
-    private static final long serialVersionUID = 1L;
-    public Get call(byte[] v) throws Exception {
-      return new Get(v);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
deleted file mode 100644
index cd4cf24..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-/**
- * This is a simple example of BulkPut with Spark Streaming
- */
-final public class JavaHBaseStreamingBulkPutExample {
-
-  private JavaHBaseStreamingBulkPutExample() {}
-
-  public static void main(String[] args) {
-    if (args.length < 4) {
-      System.out.println("JavaHBaseBulkPutExample  " +
-              "{host} {port} {tableName}");
-      return;
-    }
-
-    String host = args[0];
-    String port = args[1];
-    String tableName = args[2];
-
-    SparkConf sparkConf =
-            new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
-                    tableName + ":" + port + ":" + tableName);
-
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
-    try {
-      JavaStreamingContext jssc =
-              new JavaStreamingContext(jsc, new Duration(1000));
-
-      JavaReceiverInputDStream<String> javaDstream =
-              jssc.socketTextStream(host, Integer.parseInt(port));
-
-      Configuration conf = HBaseConfiguration.create();
-
-      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-      hbaseContext.streamBulkPut(javaDstream,
-              TableName.valueOf(tableName),
-              new PutFunction());
-    } finally {
-      jsc.stop();
-    }
-  }
-
-  public static class PutFunction implements Function<String, Put> {
-
-    private static final long serialVersionUID = 1L;
-
-    public Put call(String v) throws Exception {
-      String[] part = v.split(",");
-      Put put = new Put(Bytes.toBytes(part[0]));
-
-      put.addColumn(Bytes.toBytes(part[1]),
-              Bytes.toBytes(part[2]),
-              Bytes.toBytes(part[3]));
-      return put;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/protobuf/SparkFilter.proto
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/protobuf/SparkFilter.proto b/hbase-spark/src/main/protobuf/SparkFilter.proto
deleted file mode 100644
index e16c551..0000000
--- a/hbase-spark/src/main/protobuf/SparkFilter.proto
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file contains protocol buffers that are used for Spark filters
-// over in the hbase-spark module
-package hbase.pb;
-
-option java_package = "org.apache.hadoop.hbase.spark.protobuf.generated";
-option java_outer_classname = "SparkFilterProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-option optimize_for = SPEED;
-
-message SQLPredicatePushDownCellToColumnMapping {
-  required bytes column_family = 1;
-  required bytes qualifier = 2;
-  required string column_name = 3;
-}
-
-message SQLPredicatePushDownFilter {
-  required string dynamic_logic_expression = 1;
-  repeated bytes value_from_query_array = 2;
-  repeated SQLPredicatePushDownCellToColumnMapping cell_to_column_mapping = 3;
-  optional string encoderClassName = 4;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
deleted file mode 100644
index 9442c50..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-import java.util.Comparator
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Partitioner
-
-/**
- * A Partitioner implementation that will separate records to different
- * HBase Regions based on region splits
- *
- * @param startKeys   The start keys for the given table
- */
-@InterfaceAudience.Public
-class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
-  extends Partitioner {
-  // when table not exist, startKeys = Byte[0][]
-  override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length
-
-  override def getPartition(key: Any): Int = {
-
-    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
-      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
-        Bytes.compareTo(o1, o2)
-      }
-    }
-
-    val rowKey:Array[Byte] =
-      key match {
-        case qualifier: KeyFamilyQualifier =>
-          qualifier.rowKey
-        case wrapper: ByteArrayWrapper =>
-          wrapper.value
-        case _ =>
-          key.asInstanceOf[Array[Byte]]
-      }
-    var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
-    if (partition < 0)
-      partition = partition * -1 + -2
-    if (partition < 0)
-      partition = 0
-    partition
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala
deleted file mode 100644
index 2d0be38..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-@InterfaceAudience.Public
-class ByteArrayComparable(val bytes:Array[Byte], val offset:Int = 0, var length:Int = -1)
-  extends Comparable[ByteArrayComparable] {
-
-  if (length == -1) {
-    length = bytes.length
-  }
-
-  override def compareTo(o: ByteArrayComparable): Int = {
-    Bytes.compareTo(bytes, offset, length, o.bytes, o.offset, o.length)
-  }
-
-  override def hashCode(): Int = {
-    Bytes.hashCode(bytes, offset, length)
-  }
-
-  override def equals (obj: Any): Boolean = {
-    obj match {
-      case b: ByteArrayComparable =>
-        Bytes.equals(bytes, offset, length, b.bytes, b.offset, b.length)
-      case _ =>
-        false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
deleted file mode 100644
index 738fa45..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import java.io.Serializable
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * This is a wrapper over a byte array so it can work as
- * a key in a hashMap
- *
- * @param value The Byte Array value
- */
-@InterfaceAudience.Public
-class ByteArrayWrapper (var value:Array[Byte])
-  extends Comparable[ByteArrayWrapper] with Serializable {
-  override def compareTo(valueOther: ByteArrayWrapper): Int = {
-    Bytes.compareTo(value,valueOther.value)
-  }
-  override def equals(o2: Any): Boolean = {
-    o2 match {
-      case wrapper: ByteArrayWrapper =>
-        Bytes.equals(value, wrapper.value)
-      case _ =>
-        false
-    }
-  }
-  override def hashCode():Int = {
-    Bytes.hashCode(value)
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
deleted file mode 100644
index 3037001..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * A wrapper class that will allow both columnFamily and qualifier to
- * be the key of a hashMap.  Also allow for finding the value in a hashmap
- * with out cloning the HBase value from the HBase Cell object
- * @param columnFamily       ColumnFamily byte array
- * @param columnFamilyOffSet Offset of columnFamily value in the array
- * @param columnFamilyLength Length of the columnFamily value in the columnFamily array
- * @param qualifier          Qualifier byte array
- * @param qualifierOffSet    Offset of qualifier value in the array
- * @param qualifierLength    Length of the qualifier value with in the array
- */
-@InterfaceAudience.Public
-class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte],
-                                         val columnFamilyOffSet:Int,
-                                         val columnFamilyLength:Int,
-                                         val qualifier:Array[Byte],
-                                         val qualifierOffSet:Int,
-                                         val qualifierLength:Int)
-  extends Serializable{
-
-  override def equals(other:Any): Boolean = {
-    val otherWrapper = other.asInstanceOf[ColumnFamilyQualifierMapKeyWrapper]
-
-    Bytes.compareTo(columnFamily,
-      columnFamilyOffSet,
-      columnFamilyLength,
-      otherWrapper.columnFamily,
-      otherWrapper.columnFamilyOffSet,
-      otherWrapper.columnFamilyLength) == 0 && Bytes.compareTo(qualifier,
-        qualifierOffSet,
-        qualifierLength,
-        otherWrapper.qualifier,
-        otherWrapper.qualifierOffSet,
-        otherWrapper.qualifierLength) == 0
-  }
-
-  override def hashCode():Int = {
-    Bytes.hashCode(columnFamily, columnFamilyOffSet, columnFamilyLength) +
-      Bytes.hashCode(qualifier, qualifierOffSet, qualifierLength)
-  }
-
-  def cloneColumnFamily():Array[Byte] = {
-    val resultArray = new Array[Byte](columnFamilyLength)
-    System.arraycopy(columnFamily, columnFamilyOffSet, resultArray, 0, columnFamilyLength)
-    resultArray
-  }
-
-  def cloneQualifier():Array[Byte] = {
-    val resultArray = new Array[Byte](qualifierLength)
-    System.arraycopy(qualifier, qualifierOffSet, resultArray, 0, qualifierLength)
-    resultArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
deleted file mode 100644
index a488dd3..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ /dev/null
@@ -1,1224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.mapred.TableOutputFormat
-import org.apache.hadoop.hbase.spark.datasources._
-import org.apache.hadoop.hbase.types._
-import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.HTableDescriptor
-import org.apache.hadoop.hbase.HColumnDescriptor
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.datasources.hbase.{Utils, Field, HBaseTableCatalog}
-import org.apache.spark.sql.{DataFrame, SaveMode, Row, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-
-import scala.collection.mutable
-
-/**
- * DefaultSource for integration with Spark's dataframe datasources.
- * This class will produce a relationProvider based on input given to it from spark
- *
- * This class needs to stay in the current package 'org.apache.hadoop.hbase.spark'
- * for Spark to match the hbase data source name.
- *
- * In all this DefaultSource support the following datasource functionality
- * - Scan range pruning through filter push down logic based on rowKeys
- * - Filter push down logic on HBase Cells
- * - Qualifier filtering based on columns used in the SparkSQL statement
- * - Type conversions of basic SQL types.  All conversions will be
- *   Through the HBase Bytes object commands.
- */
-@InterfaceAudience.Private
-class DefaultSource extends RelationProvider  with CreatableRelationProvider with Logging {
-  /**
-   * Is given input from SparkSQL to construct a BaseRelation
-    *
-    * @param sqlContext SparkSQL context
-   * @param parameters Parameters given to us from SparkSQL
-   * @return           A BaseRelation Object
-   */
-  override def createRelation(sqlContext: SQLContext,
-                              parameters: Map[String, String]):
-  BaseRelation = {
-    new HBaseRelation(parameters, None)(sqlContext)
-  }
-
-
-  override def createRelation(
-      sqlContext: SQLContext,
-      mode: SaveMode,
-      parameters: Map[String, String],
-      data: DataFrame): BaseRelation = {
-    val relation = HBaseRelation(parameters, Some(data.schema))(sqlContext)
-    relation.createTable()
-    relation.insert(data, false)
-    relation
-  }
-}
-
-/**
- * Implementation of Spark BaseRelation that will build up our scan logic
- * , do the scan pruning, filter push down, and value conversions
-  *
-  * @param sqlContext              SparkSQL context
- */
-@InterfaceAudience.Private
-case class HBaseRelation (
-    @transient parameters: Map[String, String],
-    userSpecifiedSchema: Option[StructType]
-  )(@transient val sqlContext: SQLContext)
-  extends BaseRelation with PrunedFilteredScan  with InsertableRelation  with Logging {
-  val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
-  val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
-  val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
-  val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
-  val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
-
-  @transient val encoder = JavaBytesEncoder.create(encoderClsName)
-
-  val catalog = HBaseTableCatalog(parameters)
-  def tableName = catalog.name
-  val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
-  val useHBaseContext =  parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
-  val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
-    .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
-
-  // The user supplied per table parameter will overwrite global ones in SparkConf
-  val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
-    .getOrElse(
-      sqlContext.sparkContext.getConf.getBoolean(
-        HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
-  val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
-    .getOrElse(
-      sqlContext.sparkContext.getConf.getInt(
-      HBaseSparkConf.QUERY_CACHEDROWS, -1))
-  val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
-    .getOrElse(sqlContext.sparkContext.getConf.getInt(
-    HBaseSparkConf.QUERY_BATCHSIZE,  -1))
-
-  val bulkGetSize =  parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
-    .getOrElse(sqlContext.sparkContext.getConf.getInt(
-    HBaseSparkConf.BULKGET_SIZE,  HBaseSparkConf.DEFAULT_BULKGET_SIZE))
-
-  //create or get latest HBaseContext
-  val hbaseContext:HBaseContext = if (useHBaseContext) {
-    LatestHBaseContextCache.latest
-  } else {
-    val config = HBaseConfiguration.create()
-    configResources.split(",").foreach( r => config.addResource(r))
-    new HBaseContext(sqlContext.sparkContext, config)
-  }
-
-  val wrappedConf = new SerializableConfiguration(hbaseContext.config)
-  def hbaseConf = wrappedConf.value
-
-  /**
-   * Generates a Spark SQL schema objeparametersct so Spark SQL knows what is being
-   * provided by this BaseRelation
-   *
-   * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value
-   */
-  override val schema: StructType = userSpecifiedSchema.getOrElse(catalog.toDataType)
-
-
-
-  def createTable() {
-    val numReg = parameters.get(HBaseTableCatalog.newTable).map(x => x.toInt).getOrElse(0)
-    val startKey =  Bytes.toBytes(
-      parameters.get(HBaseTableCatalog.regionStart)
-        .getOrElse(HBaseTableCatalog.defaultRegionStart))
-    val endKey = Bytes.toBytes(
-      parameters.get(HBaseTableCatalog.regionEnd)
-        .getOrElse(HBaseTableCatalog.defaultRegionEnd))
-    if (numReg > 3) {
-      val tName = TableName.valueOf(catalog.name)
-      val cfs = catalog.getColumnFamilies
-
-      val connection = HBaseConnectionCache.getConnection(hbaseConf)
-      // Initialize hBase table if necessary
-      val admin = connection.getAdmin
-      try {
-        if (!admin.isTableAvailable(tName)) {
-          val tableDesc = new HTableDescriptor(tName)
-          cfs.foreach { x =>
-            val cf = new HColumnDescriptor(x.getBytes())
-            logDebug(s"add family $x to ${catalog.name}")
-            tableDesc.addFamily(cf)
-          }
-          val splitKeys = Bytes.split(startKey, endKey, numReg);
-          admin.createTable(tableDesc, splitKeys)
-
-        }
-      }finally {
-        admin.close()
-        connection.close()
-      }
-    } else {
-      logInfo(
-        s"""${HBaseTableCatalog.newTable}
-           |is not defined or no larger than 3, skip the create table""".stripMargin)
-    }
-  }
-
-  /**
-    *
-    * @param data
-    * @param overwrite
-    */
-  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)
-    jobConfig.setOutputFormat(classOf[TableOutputFormat])
-    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, catalog.name)
-    var count = 0
-    val rkFields = catalog.getRowKey
-    val rkIdxedFields = rkFields.map{ case x =>
-      (schema.fieldIndex(x.colName), x)
-    }
-    val colsIdxedFields = schema
-      .fieldNames
-      .partition( x => rkFields.map(_.colName).contains(x))
-      ._2.map(x => (schema.fieldIndex(x), catalog.getField(x)))
-    val rdd = data.rdd
-    def convertToPut(row: Row) = {
-      // construct bytes for row key
-      val rowBytes = rkIdxedFields.map { case (x, y) =>
-        Utils.toBytes(row(x), y)
-      }
-      val rLen = rowBytes.foldLeft(0) { case (x, y) =>
-        x + y.length
-      }
-      val rBytes = new Array[Byte](rLen)
-      var offset = 0
-      rowBytes.foreach { x =>
-        System.arraycopy(x, 0, rBytes, offset, x.length)
-        offset += x.length
-      }
-      val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
-
-      colsIdxedFields.foreach { case (x, y) =>
-        val b = Utils.toBytes(row(x), y)
-        put.addColumn(Bytes.toBytes(y.cf), Bytes.toBytes(y.col), b)
-      }
-      count += 1
-      (new ImmutableBytesWritable, put)
-    }
-    rdd.map(convertToPut(_)).saveAsHadoopDataset(jobConfig)
-  }
-
-  def getIndexedProjections(requiredColumns: Array[String]): Seq[(Field, Int)] = {
-    requiredColumns.map(catalog.sMap.getField(_)).zipWithIndex
-  }
-
-
-  /**
-    * Takes a HBase Row object and parses all of the fields from it.
-    * This is independent of which fields were requested from the key
-    * Because we have all the data it's less complex to parse everything.
-    *
-    * @param row the retrieved row from hbase.
-    * @param keyFields all of the fields in the row key, ORDERED by their order in the row key.
-    */
-  def parseRowKey(row: Array[Byte], keyFields: Seq[Field]): Map[Field, Any] = {
-    keyFields.foldLeft((0, Seq[(Field, Any)]()))((state, field) => {
-      val idx = state._1
-      val parsed = state._2
-      if (field.length != -1) {
-        val value = Utils.hbaseFieldToScalaType(field, row, idx, field.length)
-        // Return the new index and appended value
-        (idx + field.length, parsed ++ Seq((field, value)))
-      } else {
-        field.dt match {
-          case StringType =>
-            val pos = row.indexOf(HBaseTableCatalog.delimiter, idx)
-            if (pos == -1 || pos > row.length) {
-              // this is at the last dimension
-              val value = Utils.hbaseFieldToScalaType(field, row, idx, row.length)
-              (row.length + 1, parsed ++ Seq((field, value)))
-            } else {
-              val value = Utils.hbaseFieldToScalaType(field, row, idx, pos - idx)
-              (pos, parsed ++ Seq((field, value)))
-            }
-          // We don't know the length, assume it extends to the end of the rowkey.
-          case _ => (row.length + 1, parsed ++ Seq((field, Utils.hbaseFieldToScalaType(field, row, idx, row.length))))
-        }
-      }
-    })._2.toMap
-  }
-
-  def buildRow(fields: Seq[Field], result: Result): Row = {
-    val r = result.getRow
-    val keySeq = parseRowKey(r, catalog.getRowKey)
-    val valueSeq = fields.filter(!_.isRowKey).map { x =>
-      val kv = result.getColumnLatestCell(Bytes.toBytes(x.cf), Bytes.toBytes(x.col))
-      if (kv == null || kv.getValueLength == 0) {
-        (x, null)
-      } else {
-        val v = CellUtil.cloneValue(kv)
-        (x, x.dt match {
-          // Here, to avoid arraycopy, return v directly instead of calling hbaseFieldToScalaType
-          case BinaryType => v
-          case _ => Utils.hbaseFieldToScalaType(x, v, 0, v.length)
-        })
-      }
-    }.toMap
-    val unionedRow = keySeq ++ valueSeq
-    // Return the row ordered by the requested order
-    Row.fromSeq(fields.map(unionedRow.get(_).getOrElse(null)))
-  }
-
-  /**
-   * Here we are building the functionality to populate the resulting RDD[Row]
-   * Here is where we will do the following:
-   * - Filter push down
-   * - Scan or GetList pruning
-   * - Executing our scan(s) or/and GetList to generate result
-   *
-   * @param requiredColumns The columns that are being requested by the requesting query
-   * @param filters         The filters that are being applied by the requesting query
-   * @return                RDD will all the results from HBase needed for SparkSQL to
-   *                        execute the query on
-   */
-  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-
-    val pushDownTuple = buildPushDownPredicatesResource(filters)
-    val pushDownRowKeyFilter = pushDownTuple._1
-    var pushDownDynamicLogicExpression = pushDownTuple._2
-    val valueArray = pushDownTuple._3
-
-    if (!usePushDownColumnFilter) {
-      pushDownDynamicLogicExpression = null
-    }
-
-    logDebug("pushDownRowKeyFilter:           " + pushDownRowKeyFilter.ranges)
-    if (pushDownDynamicLogicExpression != null) {
-      logDebug("pushDownDynamicLogicExpression: " +
-        pushDownDynamicLogicExpression.toExpressionString)
-    }
-    logDebug("valueArray:                     " + valueArray.length)
-
-    val requiredQualifierDefinitionList =
-      new mutable.MutableList[Field]
-
-    requiredColumns.foreach( c => {
-      val field = catalog.getField(c)
-      requiredQualifierDefinitionList += field
-    })
-
-    //retain the information for unit testing checks
-    DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
-      pushDownDynamicLogicExpression)
-
-    val getList = new util.ArrayList[Get]()
-    val rddList = new util.ArrayList[RDD[Row]]()
-
-    //add points to getList
-    pushDownRowKeyFilter.points.foreach(p => {
-      val get = new Get(p)
-      requiredQualifierDefinitionList.foreach( d => {
-        if (d.isRowKey)
-          get.addColumn(d.cfBytes, d.colBytes)
-      })
-      getList.add(get)
-    })
-
-    val pushDownFilterJava = if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
-        Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
-          valueArray, requiredQualifierDefinitionList, encoderClsName))
-    } else {
-      None
-    }
-    val hRdd = new HBaseTableScanRDD(this, hbaseContext, pushDownFilterJava, requiredQualifierDefinitionList.seq)
-    pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
-    pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
-
-    var resultRDD: RDD[Row] = {
-      val tmp = hRdd.map{ r =>
-        val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
-        buildRow(indexedFields, r)
-
-      }
-      if (tmp.partitions.size > 0) {
-        tmp
-      } else {
-        null
-      }
-    }
-
-    if (resultRDD == null) {
-      val scan = new Scan()
-      scan.setCacheBlocks(blockCacheEnable)
-      scan.setBatch(batchNum)
-      scan.setCaching(cacheSize)
-      requiredQualifierDefinitionList.foreach( d =>
-        scan.addColumn(d.cfBytes, d.colBytes))
-
-      val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
-        val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
-        buildRow(indexedFields, r._2)
-      })
-      resultRDD=rdd
-    }
-    resultRDD
-  }
-
-  def buildPushDownPredicatesResource(filters: Array[Filter]):
-  (RowKeyFilter, DynamicLogicExpression, Array[Array[Byte]]) = {
-    var superRowKeyFilter:RowKeyFilter = null
-    val queryValueList = new mutable.MutableList[Array[Byte]]
-    var superDynamicLogicExpression: DynamicLogicExpression = null
-
-    filters.foreach( f => {
-      val rowKeyFilter = new RowKeyFilter()
-      val logicExpression = transverseFilterTree(rowKeyFilter, queryValueList, f)
-      if (superDynamicLogicExpression == null) {
-        superDynamicLogicExpression = logicExpression
-        superRowKeyFilter = rowKeyFilter
-      } else {
-        superDynamicLogicExpression =
-          new AndLogicExpression(superDynamicLogicExpression, logicExpression)
-        superRowKeyFilter.mergeIntersect(rowKeyFilter)
-      }
-
-    })
-
-    val queryValueArray = queryValueList.toArray
-
-    if (superRowKeyFilter == null) {
-      superRowKeyFilter = new RowKeyFilter
-    }
-
-    (superRowKeyFilter, superDynamicLogicExpression, queryValueArray)
-  }
-
-  /**
-    * For some codec, the order may be inconsistent between java primitive
-    * type and its byte array. We may have to  split the predicates on some
-    * of the java primitive type into multiple predicates. The encoder will take
-    * care of it and returning the concrete ranges.
-    *
-    * For example in naive codec,  some of the java primitive types have to be split into multiple
-    * predicates, and union these predicates together to make the predicates be performed correctly.
-    * For example, if we have "COLUMN < 2", we will transform it into
-    * "0 <= COLUMN < 2 OR Integer.MIN_VALUE <= COLUMN <= -1"
-    */
-
-  def transverseFilterTree(parentRowKeyFilter:RowKeyFilter,
-                                  valueArray:mutable.MutableList[Array[Byte]],
-                                  filter:Filter): DynamicLogicExpression = {
-    filter match {
-      case EqualTo(attr, value) =>
-        val field = catalog.getField(attr)
-        if (field != null) {
-          if (field.isRowKey) {
-            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
-              DefaultSourceStaticUtils.getByteValue(field,
-                value.toString), null))
-          }
-          val byteValue =
-            DefaultSourceStaticUtils.getByteValue(field, value.toString)
-          valueArray += byteValue
-        }
-        new EqualLogicExpression(attr, valueArray.length - 1, false)
-
-      /**
-        * encoder may split the predicates into multiple byte array boundaries.
-        * Each boundaries is mapped into the RowKeyFilter and then is unioned by the reduce
-        * operation. If the data type is not supported, b will be None, and there is
-        * no operation happens on the parentRowKeyFilter.
-        *
-        * Note that because LessThan is not inclusive, thus the first bound should be exclusive,
-        * which is controlled by inc.
-        *
-        * The other predicates, i.e., GreaterThan/LessThanOrEqual/GreaterThanOrEqual follows
-        * the similar logic.
-        */
-      case LessThan(attr, value) =>
-        val field = catalog.getField(attr)
-        if (field != null) {
-          if (field.isRowKey) {
-            val b = encoder.ranges(value)
-            var inc = false
-            b.map(_.less.map { x =>
-              val r = new RowKeyFilter(null,
-                new ScanRange(x.upper, inc, x.low, true)
-              )
-              inc = true
-              r
-            }).map { x =>
-              x.reduce { (i, j) =>
-                i.mergeUnion(j)
-              }
-            }.map(parentRowKeyFilter.mergeIntersect(_))
-          }
-          val byteValue = encoder.encode(field.dt, value)
-          valueArray += byteValue
-        }
-        new LessThanLogicExpression(attr, valueArray.length - 1)
-      case GreaterThan(attr, value) =>
-        val field = catalog.getField(attr)
-        if (field != null) {
-          if (field.isRowKey) {
-            val b = encoder.ranges(value)
-            var inc = false
-            b.map(_.greater.map{x =>
-              val r = new RowKeyFilter(null,
-                new ScanRange(x.upper, true, x.low, inc))
-              inc = true
-              r
-            }).map { x =>
-              x.reduce { (i, j) =>
-                i.mergeUnion(j)
-              }
-            }.map(parentRowKeyFilter.mergeIntersect(_))
-          }
-          val byteValue = encoder.encode(field.dt, value)
-          valueArray += byteValue
-        }
-        new GreaterThanLogicExpression(attr, valueArray.length - 1)
-      case LessThanOrEqual(attr, value) =>
-        val field = catalog.getField(attr)
-        if (field != null) {
-          if (field.isRowKey) {
-            val b = encoder.ranges(value)
-            b.map(_.less.map(x =>
-              new RowKeyFilter(null,
-                new ScanRange(x.upper, true, x.low, true))))
-              .map { x =>
-                x.reduce{ (i, j) =>
-                  i.mergeUnion(j)
-                }
-              }.map(parentRowKeyFilter.mergeIntersect(_))
-          }
-          val byteValue = encoder.encode(field.dt, value)
-          valueArray += byteValue
-        }
-        new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
-      case GreaterThanOrEqual(attr, value) =>
-        val field = catalog.getField(attr)
-        if (field != null) {
-          if (field.isRowKey) {
-            val b = encoder.ranges(value)
-            b.map(_.greater.map(x =>
-              new RowKeyFilter(null,
-                new ScanRange(x.upper, true, x.low, true))))
-              .map { x =>
-                x.reduce { (i, j) =>
-                  i.mergeUnion(j)
-                }
-              }.map(parentRowKeyFilter.mergeIntersect(_))
-          }
-          val byteValue = encoder.encode(field.dt, value)
-          valueArray += byteValue
-        }
-        new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
-      case Or(left, right) =>
-        val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
-        val rightSideRowKeyFilter = new RowKeyFilter
-        val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
-
-        parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)
-
-        new OrLogicExpression(leftExpression, rightExpression)
-      case And(left, right) =>
-
-        val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
-        val rightSideRowKeyFilter = new RowKeyFilter
-        val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
-        parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)
-
-        new AndLogicExpression(leftExpression, rightExpression)
-      case IsNull(attr) =>
-        new IsNullLogicExpression(attr, false)
-      case IsNotNull(attr) =>
-        new IsNullLogicExpression(attr, true)
-      case _ =>
-        new PassThroughLogicExpression
-    }
-  }
-}
-
-/**
- * Construct to contain a single scan ranges information.  Also
- * provide functions to merge with other scan ranges through AND
- * or OR operators
- *
- * @param upperBound          Upper bound of scan
- * @param isUpperBoundEqualTo Include upper bound value in the results
- * @param lowerBound          Lower bound of scan
- * @param isLowerBoundEqualTo Include lower bound value in the results
- */
-@InterfaceAudience.Private
-class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
-                var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean)
-  extends Serializable {
-
-  /**
-   * Function to merge another scan object through a AND operation
-    *
-    * @param other Other scan object
-   */
-  def mergeIntersect(other:ScanRange): Unit = {
-    val upperBoundCompare = compareRange(upperBound, other.upperBound)
-    val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
-
-    upperBound = if (upperBoundCompare <0) upperBound else other.upperBound
-    lowerBound = if (lowerBoundCompare >0) lowerBound else other.lowerBound
-
-    isLowerBoundEqualTo = if (lowerBoundCompare == 0)
-      isLowerBoundEqualTo && other.isLowerBoundEqualTo
-    else isLowerBoundEqualTo
-
-    isUpperBoundEqualTo = if (upperBoundCompare == 0)
-      isUpperBoundEqualTo && other.isUpperBoundEqualTo
-    else isUpperBoundEqualTo
-  }
-
-  /**
-   * Function to merge another scan object through a OR operation
-    *
-    * @param other Other scan object
-   */
-  def mergeUnion(other:ScanRange): Unit = {
-
-    val upperBoundCompare = compareRange(upperBound, other.upperBound)
-    val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
-
-    upperBound = if (upperBoundCompare >0) upperBound else other.upperBound
-    lowerBound = if (lowerBoundCompare <0) lowerBound else other.lowerBound
-
-    isLowerBoundEqualTo = if (lowerBoundCompare == 0)
-      isLowerBoundEqualTo || other.isLowerBoundEqualTo
-    else if (lowerBoundCompare < 0) isLowerBoundEqualTo else other.isLowerBoundEqualTo
-
-    isUpperBoundEqualTo = if (upperBoundCompare == 0)
-      isUpperBoundEqualTo || other.isUpperBoundEqualTo
-    else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo
-  }
-
-  /**
-   * Common function to see if this scan over laps with another
-   *
-   * Reference Visual
-   *
-   * A                           B
-   * |---------------------------|
-   *   LL--------------LU
-   *        RL--------------RU
-   *
-   * A = lowest value is byte[0]
-   * B = highest value is null
-   * LL = Left Lower Bound
-   * LU = Left Upper Bound
-   * RL = Right Lower Bound
-   * RU = Right Upper Bound
-   *
-   * @param other Other scan object
-   * @return      True is overlap false is not overlap
-   */
-  def getOverLapScanRange(other:ScanRange): ScanRange = {
-
-    var leftRange:ScanRange = null
-    var rightRange:ScanRange = null
-
-    // First identify the Left range
-    // Also lower bound can't be null
-    if (compareRange(lowerBound, other.lowerBound) < 0 ||
-      compareRange(upperBound, other.upperBound) < 0) {
-      leftRange = this
-      rightRange = other
-    } else {
-      leftRange = other
-      rightRange = this
-    }
-
-    if (hasOverlap(leftRange, rightRange)) {
-      // Find the upper bound and lower bound
-      if (compareRange(leftRange.upperBound, rightRange.upperBound) >= 0) {
-        new ScanRange(rightRange.upperBound, rightRange.isUpperBoundEqualTo,
-          rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
-      } else {
-        new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo,
-          rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
-      }
-    } else {
-      null
-    }
-  }
-
-  /**
-    * The leftRange.upperBound has to be larger than the rightRange's lowerBound.
-    * Otherwise, there is no overlap.
-    *
-    * @param left: The range with the smaller lowBound
-    * @param right: The range with the larger lowBound
-    * @return Whether two ranges have overlap.
-    */
-
-  def hasOverlap(left: ScanRange, right: ScanRange): Boolean = {
-    compareRange(left.upperBound, right.lowerBound) >= 0
-  }
-
-  /**
-   * Special compare logic because we can have null values
-   * for left or right bound
-   *
-   * @param left  Left byte array
-   * @param right Right byte array
-   * @return      0 for equals 1 is left is greater and -1 is right is greater
-   */
-  def compareRange(left:Array[Byte], right:Array[Byte]): Int = {
-    if (left == null && right == null) 0
-    else if (left == null && right != null) 1
-    else if (left != null && right == null) -1
-    else Bytes.compareTo(left, right)
-  }
-
-  /**
-   *
-   * @return
-   */
-  def containsPoint(point:Array[Byte]): Boolean = {
-    val lowerCompare = compareRange(point, lowerBound)
-    val upperCompare = compareRange(point, upperBound)
-
-    ((isLowerBoundEqualTo && lowerCompare >= 0) ||
-      (!isLowerBoundEqualTo && lowerCompare > 0)) &&
-      ((isUpperBoundEqualTo && upperCompare <= 0) ||
-        (!isUpperBoundEqualTo && upperCompare < 0))
-
-  }
-  override def toString:String = {
-    "ScanRange:(upperBound:" + Bytes.toString(upperBound) +
-      ",isUpperBoundEqualTo:" + isUpperBoundEqualTo + ",lowerBound:" +
-      Bytes.toString(lowerBound) + ",isLowerBoundEqualTo:" + isLowerBoundEqualTo + ")"
-  }
-}
-
-/**
- * Contains information related to a filters for a given column.
- * This can contain many ranges or points.
- *
- * @param currentPoint the initial point when the filter is created
- * @param currentRange the initial scanRange when the filter is created
- */
-@InterfaceAudience.Private
-class ColumnFilter (currentPoint:Array[Byte] = null,
-                     currentRange:ScanRange = null,
-                     var points:mutable.MutableList[Array[Byte]] =
-                     new mutable.MutableList[Array[Byte]](),
-                     var ranges:mutable.MutableList[ScanRange] =
-                     new mutable.MutableList[ScanRange]() ) extends Serializable {
-  //Collection of ranges
-  if (currentRange != null ) ranges.+=(currentRange)
-
-  //Collection of points
-  if (currentPoint != null) points.+=(currentPoint)
-
-  /**
-   * This will validate a give value through the filter's points and/or ranges
-   * the result will be if the value passed the filter
-   *
-   * @param value       Value to be validated
-   * @param valueOffSet The offset of the value
-   * @param valueLength The length of the value
-   * @return            True is the value passes the filter false if not
-   */
-  def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
-    var result = false
-
-    points.foreach( p => {
-      if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
-        result = true
-      }
-    })
-
-    ranges.foreach( r => {
-      val upperBoundPass = r.upperBound == null ||
-        (r.isUpperBoundEqualTo &&
-          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
-            value, valueOffSet, valueLength) >= 0) ||
-        (!r.isUpperBoundEqualTo &&
-          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
-            value, valueOffSet, valueLength) > 0)
-
-      val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
-        (r.isLowerBoundEqualTo &&
-          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
-            value, valueOffSet, valueLength) <= 0) ||
-        (!r.isLowerBoundEqualTo &&
-          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
-            value, valueOffSet, valueLength) < 0)
-
-      result = result || (upperBoundPass && lowerBoundPass)
-    })
-    result
-  }
-
-  /**
-   * This will allow us to merge filter logic that is joined to the existing filter
-   * through a OR operator
-   *
-   * @param other Filter to merge
-   */
-  def mergeUnion(other:ColumnFilter): Unit = {
-    other.points.foreach( p => points += p)
-
-    other.ranges.foreach( otherR => {
-      var doesOverLap = false
-      ranges.foreach{ r =>
-        if (r.getOverLapScanRange(otherR) != null) {
-          r.mergeUnion(otherR)
-          doesOverLap = true
-        }}
-      if (!doesOverLap) ranges.+=(otherR)
-    })
-  }
-
-  /**
-   * This will allow us to merge filter logic that is joined to the existing filter
-   * through a AND operator
-   *
-   * @param other Filter to merge
-   */
-  def mergeIntersect(other:ColumnFilter): Unit = {
-    val survivingPoints = new mutable.MutableList[Array[Byte]]()
-    points.foreach( p => {
-      other.points.foreach( otherP => {
-        if (Bytes.equals(p, otherP)) {
-          survivingPoints.+=(p)
-        }
-      })
-    })
-    points = survivingPoints
-
-    val survivingRanges = new mutable.MutableList[ScanRange]()
-
-    other.ranges.foreach( otherR => {
-      ranges.foreach( r => {
-        if (r.getOverLapScanRange(otherR) != null) {
-          r.mergeIntersect(otherR)
-          survivingRanges += r
-        }
-      })
-    })
-    ranges = survivingRanges
-  }
-
-  override def toString:String = {
-    val strBuilder = new StringBuilder
-    strBuilder.append("(points:(")
-    var isFirst = true
-    points.foreach( p => {
-      if (isFirst) isFirst = false
-      else strBuilder.append(",")
-      strBuilder.append(Bytes.toString(p))
-    })
-    strBuilder.append("),ranges:")
-    isFirst = true
-    ranges.foreach( r => {
-      if (isFirst) isFirst = false
-      else strBuilder.append(",")
-      strBuilder.append(r)
-    })
-    strBuilder.append("))")
-    strBuilder.toString()
-  }
-}
-
-/**
- * A collection of ColumnFilters indexed by column names.
- *
- * Also contains merge commends that will consolidate the filters
- * per column name
- */
-@InterfaceAudience.Private
-class ColumnFilterCollection {
-  val columnFilterMap = new mutable.HashMap[String, ColumnFilter]
-
-  def clear(): Unit = {
-    columnFilterMap.clear()
-  }
-
-  /**
-   * This will allow us to merge filter logic that is joined to the existing filter
-   * through a OR operator.  This will merge a single columns filter
-   *
-   * @param column The column to be merged
-   * @param other  The other ColumnFilter object to merge
-   */
-  def mergeUnion(column:String, other:ColumnFilter): Unit = {
-    val existingFilter = columnFilterMap.get(column)
-    if (existingFilter.isEmpty) {
-      columnFilterMap.+=((column, other))
-    } else {
-      existingFilter.get.mergeUnion(other)
-    }
-  }
-
-  /**
-   * This will allow us to merge all filters in the existing collection
-   * to the filters in the other collection.  All merges are done as a result
-   * of a OR operator
-   *
-   * @param other The other Column Filter Collection to be merged
-   */
-  def mergeUnion(other:ColumnFilterCollection): Unit = {
-    other.columnFilterMap.foreach( e => {
-      mergeUnion(e._1, e._2)
-    })
-  }
-
-  /**
-   * This will allow us to merge all filters in the existing collection
-   * to the filters in the other collection.  All merges are done as a result
-   * of a AND operator
-   *
-   * @param other The column filter from the other collection
-   */
-  def mergeIntersect(other:ColumnFilterCollection): Unit = {
-    other.columnFilterMap.foreach( e => {
-      val existingColumnFilter = columnFilterMap.get(e._1)
-      if (existingColumnFilter.isEmpty) {
-        columnFilterMap += e
-      } else {
-        existingColumnFilter.get.mergeIntersect(e._2)
-      }
-    })
-  }
-
-  override def toString:String = {
-    val strBuilder = new StringBuilder
-    columnFilterMap.foreach( e => strBuilder.append(e))
-    strBuilder.toString()
-  }
-}
-
-/**
- * Status object to store static functions but also to hold last executed
- * information that can be used for unit testing.
- */
-@InterfaceAudience.Private
-object DefaultSourceStaticUtils {
-
-  val rawInteger = new RawInteger
-  val rawLong = new RawLong
-  val rawFloat = new RawFloat
-  val rawDouble = new RawDouble
-  val rawString = RawString.ASCENDING
-
-  val byteRange = new ThreadLocal[PositionedByteRange] {
-    override def initialValue(): PositionedByteRange = {
-      val range = new SimplePositionedMutableByteRange()
-      range.setOffset(0)
-      range.setPosition(0)
-    }
-  }
-
-  def getFreshByteRange(bytes: Array[Byte]): PositionedByteRange = {
-    getFreshByteRange(bytes, 0, bytes.length)
-  }
-
-  def getFreshByteRange(bytes: Array[Byte], offset: Int = 0, length: Int):
-  PositionedByteRange = {
-    byteRange.get().set(bytes).setLength(length).setOffset(offset)
-  }
-
-  //This will contain the last 5 filters and required fields used in buildScan
-  // These values can be used in unit testing to make sure we are converting
-  // The Spark SQL input correctly
-  val lastFiveExecutionRules =
-    new ConcurrentLinkedQueue[ExecutionRuleForUnitTesting]()
-
-  /**
-   * This method is to populate the lastFiveExecutionRules for unit test perposes
-   * This method is not thread safe.
-   *
-   * @param rowKeyFilter           The rowKey Filter logic used in the last query
-   * @param dynamicLogicExpression The dynamicLogicExpression used in the last query
-   */
-  def populateLatestExecutionRules(rowKeyFilter: RowKeyFilter,
-                                   dynamicLogicExpression: DynamicLogicExpression): Unit = {
-    lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
-      rowKeyFilter, dynamicLogicExpression))
-    while (lastFiveExecutionRules.size() > 5) {
-      lastFiveExecutionRules.poll()
-    }
-  }
-
-  /**
-   * This method will convert the result content from HBase into the
-   * SQL value type that is requested by the Spark SQL schema definition
-   *
-   * @param field              The structure of the SparkSQL Column
-   * @param r                       The result object from HBase
-   * @return                        The converted object type
-   */
-  def getValue(field: Field,
-      r: Result): Any = {
-    if (field.isRowKey) {
-      val row = r.getRow
-
-      field.dt match {
-        case IntegerType => rawInteger.decode(getFreshByteRange(row))
-        case LongType => rawLong.decode(getFreshByteRange(row))
-        case FloatType => rawFloat.decode(getFreshByteRange(row))
-        case DoubleType => rawDouble.decode(getFreshByteRange(row))
-        case StringType => rawString.decode(getFreshByteRange(row))
-        case TimestampType => rawLong.decode(getFreshByteRange(row))
-        case _ => Bytes.toString(row)
-      }
-    } else {
-      val cellByteValue =
-        r.getColumnLatestCell(field.cfBytes, field.colBytes)
-      if (cellByteValue == null) null
-      else field.dt match {
-        case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case StringType => Bytes.toString(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength)
-        case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-        case _ => Bytes.toString(cellByteValue.getValueArray,
-          cellByteValue.getValueOffset, cellByteValue.getValueLength)
-      }
-    }
-  }
-
-  /**
-   * This will convert the value from SparkSQL to be stored into HBase using the
-   * right byte Type
-   *
-   * @param value                   String value from SparkSQL
-   * @return                        Returns the byte array to go into HBase
-   */
-  def getByteValue(field: Field,
-      value: String): Array[Byte] = {
-    field.dt match {
-      case IntegerType =>
-        val result = new Array[Byte](Bytes.SIZEOF_INT)
-        val localDataRange = getFreshByteRange(result)
-        rawInteger.encode(localDataRange, value.toInt)
-        localDataRange.getBytes
-      case LongType =>
-        val result = new Array[Byte](Bytes.SIZEOF_LONG)
-        val localDataRange = getFreshByteRange(result)
-        rawLong.encode(localDataRange, value.toLong)
-        localDataRange.getBytes
-      case FloatType =>
-        val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
-        val localDataRange = getFreshByteRange(result)
-        rawFloat.encode(localDataRange, value.toFloat)
-        localDataRange.getBytes
-      case DoubleType =>
-        val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
-        val localDataRange = getFreshByteRange(result)
-        rawDouble.encode(localDataRange, value.toDouble)
-        localDataRange.getBytes
-      case StringType =>
-        Bytes.toBytes(value)
-      case TimestampType =>
-        val result = new Array[Byte](Bytes.SIZEOF_LONG)
-        val localDataRange = getFreshByteRange(result)
-        rawLong.encode(localDataRange, value.toLong)
-        localDataRange.getBytes
-
-      case _ => Bytes.toBytes(value)
-    }
-  }
-}
-
-/**
- * Contains information related to a filters for a given column.
- * This can contain many ranges or points.
- *
- * @param currentPoint the initial point when the filter is created
- * @param currentRange the initial scanRange when the filter is created
- */
-@InterfaceAudience.Private
-class RowKeyFilter (currentPoint:Array[Byte] = null,
-                    currentRange:ScanRange =
-                    new ScanRange(null, true, new Array[Byte](0), true),
-                    var points:mutable.MutableList[Array[Byte]] =
-                    new mutable.MutableList[Array[Byte]](),
-                    var ranges:mutable.MutableList[ScanRange] =
-                    new mutable.MutableList[ScanRange]() ) extends Serializable {
-  //Collection of ranges
-  if (currentRange != null ) ranges.+=(currentRange)
-
-  //Collection of points
-  if (currentPoint != null) points.+=(currentPoint)
-
-  /**
-   * This will validate a give value through the filter's points and/or ranges
-   * the result will be if the value passed the filter
-   *
-   * @param value       Value to be validated
-   * @param valueOffSet The offset of the value
-   * @param valueLength The length of the value
-   * @return            True is the value passes the filter false if not
-   */
-  def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
-    var result = false
-
-    points.foreach( p => {
-      if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
-        result = true
-      }
-    })
-
-    ranges.foreach( r => {
-      val upperBoundPass = r.upperBound == null ||
-        (r.isUpperBoundEqualTo &&
-          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
-            value, valueOffSet, valueLength) >= 0) ||
-        (!r.isUpperBoundEqualTo &&
-          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
-            value, valueOffSet, valueLength) > 0)
-
-      val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
-      (r.isLowerBoundEqualTo &&
-        Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
-          value, valueOffSet, valueLength) <= 0) ||
-        (!r.isLowerBoundEqualTo &&
-          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
-            value, valueOffSet, valueLength) < 0)
-
-      result = result || (upperBoundPass && lowerBoundPass)
-    })
-    result
-  }
-
-  /**
-   * This will allow us to merge filter logic that is joined to the existing filter
-   * through a OR operator
-   *
-   * @param other Filter to merge
-   */
-  def mergeUnion(other:RowKeyFilter): RowKeyFilter = {
-    other.points.foreach( p => points += p)
-
-    other.ranges.foreach( otherR => {
-      var doesOverLap = false
-      ranges.foreach{ r =>
-        if (r.getOverLapScanRange(otherR) != null) {
-          r.mergeUnion(otherR)
-          doesOverLap = true
-        }}
-      if (!doesOverLap) ranges.+=(otherR)
-    })
-    this
-  }
-
-  /**
-   * This will allow us to merge filter logic that is joined to the existing filter
-   * through a AND operator
-   *
-   * @param other Filter to merge
-   */
-  def mergeIntersect(other:RowKeyFilter): RowKeyFilter = {
-    val survivingPoints = new mutable.MutableList[Array[Byte]]()
-    val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]()
-    if (points == null || points.length == 0) {
-      other.points.foreach( otherP => {
-        didntSurviveFirstPassPoints += otherP
-      })
-    } else {
-      points.foreach(p => {
-        if (other.points.length == 0) {
-          didntSurviveFirstPassPoints += p
-        } else {
-          other.points.foreach(otherP => {
-            if (Bytes.equals(p, otherP)) {
-              survivingPoints += p
-            } else {
-              didntSurviveFirstPassPoints += p
-            }
-          })
-        }
-      })
-    }
-
-    val survivingRanges = new mutable.MutableList[ScanRange]()
-
-    if (ranges.length == 0) {
-      didntSurviveFirstPassPoints.foreach(p => {
-          survivingPoints += p
-      })
-    } else {
-      ranges.foreach(r => {
-        other.ranges.foreach(otherR => {
-          val overLapScanRange = r.getOverLapScanRange(otherR)
-          if (overLapScanRange != null) {
-            survivingRanges += overLapScanRange
-          }
-        })
-        didntSurviveFirstPassPoints.foreach(p => {
-          if (r.containsPoint(p)) {
-            survivingPoints += p
-          }
-        })
-      })
-    }
-    points = survivingPoints
-    ranges = survivingRanges
-    this
-  }
-
-  override def toString:String = {
-    val strBuilder = new StringBuilder
-    strBuilder.append("(points:(")
-    var isFirst = true
-    points.foreach( p => {
-      if (isFirst) isFirst = false
-      else strBuilder.append(",")
-      strBuilder.append(Bytes.toString(p))
-    })
-    strBuilder.append("),ranges:")
-    isFirst = true
-    ranges.foreach( r => {
-      if (isFirst) isFirst = false
-      else strBuilder.append(",")
-      strBuilder.append(r)
-    })
-    strBuilder.append("))")
-    strBuilder.toString()
-  }
-}
-
-@InterfaceAudience.Private
-class ExecutionRuleForUnitTesting(val rowKeyFilter: RowKeyFilter,
-                                  val dynamicLogicExpression: DynamicLogicExpression)