You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/09/07 16:44:18 UTC

[1/2] hbase git commit: HBASE-14181 Add Spark DataFrame DataSource to HBase-Spark Module

Repository: hbase
Updated Branches:
  refs/heads/master bada19bb5 -> e95358a7f


http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
new file mode 100644
index 0000000..d0f1091
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import scala.collection.mutable.MutableList;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This filter will push down all qualifier logic given to us
+ * by SparkSQL so that we have make the filters at the region server level
+ * and avoid sending the data back to the client to be filtered.
+ */
+public class SparkSQLPushDownFilter extends FilterBase {
+  protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class);
+
+  HashMap<ColumnFamilyQualifierMapKeyWrapper, ColumnFilter> columnFamilyQualifierFilterMap;
+
+  public SparkSQLPushDownFilter(HashMap<ColumnFamilyQualifierMapKeyWrapper,
+          ColumnFilter> columnFamilyQualifierFilterMap) {
+    this.columnFamilyQualifierFilterMap = columnFamilyQualifierFilterMap;
+  }
+
+  /**
+   * This method will find the related filter logic for the given
+   * column family and qualifier then execute it.  It will also
+   * not clone the in coming cell to avoid extra object creation
+   *
+   * @param c            The Cell to be validated
+   * @return             ReturnCode object to determine if skipping is required
+   * @throws IOException
+   */
+  @Override
+  public ReturnCode filterKeyValue(Cell c) throws IOException {
+
+    //Get filter if one exist
+    ColumnFilter filter =
+            columnFamilyQualifierFilterMap.get(new ColumnFamilyQualifierMapKeyWrapper(
+            c.getFamilyArray(),
+            c.getFamilyOffset(),
+            c.getFamilyLength(),
+            c.getQualifierArray(),
+            c.getQualifierOffset(),
+            c.getQualifierLength()));
+
+    if (filter == null) {
+      //If no filter then just include values
+      return ReturnCode.INCLUDE;
+    } else {
+      //If there is a filter then run validation
+      if (filter.validate(c.getValueArray(), c.getValueOffset(), c.getValueLength())) {
+        return ReturnCode.INCLUDE;
+      }
+    }
+    //If validation fails then skip whole row
+    return ReturnCode.NEXT_ROW;
+  }
+
+  /**
+   * @param pbBytes A pb serialized instance
+   * @return An instance of SparkSQLPushDownFilter
+   * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
+   */
+  @SuppressWarnings("unused")
+  public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes)
+          throws DeserializationException {
+
+    HashMap<ColumnFamilyQualifierMapKeyWrapper,
+            ColumnFilter> columnFamilyQualifierFilterMap = new HashMap<>();
+
+    FilterProtos.SQLPredicatePushDownFilter proto;
+    try {
+      proto = FilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes);
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+    final List<FilterProtos.SQLPredicatePushDownColumnFilter> columnFilterListList =
+            proto.getColumnFilterListList();
+
+    for (FilterProtos.SQLPredicatePushDownColumnFilter columnFilter: columnFilterListList) {
+
+      byte[] columnFamily = columnFilter.getColumnFamily().toByteArray();
+      byte[] qualifier = columnFilter.getQualifier().toByteArray();
+      final ColumnFamilyQualifierMapKeyWrapper columnFamilyQualifierMapKeyWrapper =
+              new ColumnFamilyQualifierMapKeyWrapper(columnFamily, 0, columnFamily.length,
+              qualifier, 0, qualifier.length);
+
+      final MutableList<byte[]> points = new MutableList<>();
+      final MutableList<ScanRange> scanRanges = new MutableList<>();
+
+      for (ByteString byteString: columnFilter.getGetPointListList()) {
+        points.$plus$eq(byteString.toByteArray());
+      }
+
+      for (FilterProtos.RowRange rowRange: columnFilter.getRangeListList()) {
+        ScanRange scanRange = new ScanRange(rowRange.getStopRow().toByteArray(),
+                rowRange.getStopRowInclusive(),
+                rowRange.getStartRow().toByteArray(),
+                rowRange.getStartRowInclusive());
+        scanRanges.$plus$eq(scanRange);
+      }
+
+      final ColumnFilter columnFilterObj = new ColumnFilter(null, null, points, scanRanges);
+
+      columnFamilyQualifierFilterMap.put(columnFamilyQualifierMapKeyWrapper, columnFilterObj);
+    }
+
+    return new SparkSQLPushDownFilter(columnFamilyQualifierFilterMap);
+  }
+
+  /**
+   * @return The filter serialized using pb
+   */
+  public byte[] toByteArray() {
+
+    FilterProtos.SQLPredicatePushDownFilter.Builder builder =
+            FilterProtos.SQLPredicatePushDownFilter.newBuilder();
+
+    FilterProtos.SQLPredicatePushDownColumnFilter.Builder columnBuilder =
+            FilterProtos.SQLPredicatePushDownColumnFilter.newBuilder();
+
+    FilterProtos.RowRange.Builder rowRangeBuilder = FilterProtos.RowRange.newBuilder();
+
+    for (Map.Entry<ColumnFamilyQualifierMapKeyWrapper, ColumnFilter> entry :
+            columnFamilyQualifierFilterMap.entrySet()) {
+
+      columnBuilder.setColumnFamily(
+              ByteStringer.wrap(entry.getKey().cloneColumnFamily()));
+      columnBuilder.setQualifier(
+              ByteStringer.wrap(entry.getKey().cloneQualifier()));
+
+      final MutableList<byte[]> points = entry.getValue().points();
+
+      int pointLength = points.length();
+      for (int i = 0; i < pointLength; i++) {
+        byte[] point = points.get(i).get();
+        columnBuilder.addGetPointList(ByteStringer.wrap(point));
+
+      }
+
+      final MutableList<ScanRange> ranges = entry.getValue().ranges();
+      int rangeLength = ranges.length();
+      for (int i = 0; i < rangeLength; i++) {
+        ScanRange scanRange = ranges.get(i).get();
+        rowRangeBuilder.clear();
+        rowRangeBuilder.setStartRow(ByteStringer.wrap(scanRange.lowerBound()));
+        rowRangeBuilder.setStopRow(ByteStringer.wrap(scanRange.upperBound()));
+        rowRangeBuilder.setStartRowInclusive(scanRange.isLowerBoundEqualTo());
+        rowRangeBuilder.setStopRowInclusive(scanRange.isUpperBoundEqualTo());
+
+        columnBuilder.addRangeList(rowRangeBuilder.build());
+      }
+
+      builder.addColumnFilterList(columnBuilder.build());
+      columnBuilder.clear();
+    }
+    return builder.build().toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
new file mode 100644
index 0000000..f223d1b
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * A wrapper class that will allow both columnFamily and qualifier to
+ * be the key of a hashMap.  Also allow for finding the value in a hashmap
+ * with out cloning the HBase value from the HBase Cell object
+ * @param columnFamily       ColumnFamily byte array
+ * @param columnFamilyOffSet Offset of columnFamily value in the array
+ * @param columnFamilyLength Length of the columnFamily value in the columnFamily array
+ * @param qualifier          Qualifier byte array
+ * @param qualifierOffSet    Offset of qualifier value in the array
+ * @param qualifierLength    Length of the qualifier value with in the array
+ */
+class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte],
+                                         val columnFamilyOffSet:Int,
+                                         val columnFamilyLength:Int,
+                                         val qualifier:Array[Byte],
+                                         val qualifierOffSet:Int,
+                                         val qualifierLength:Int)
+  extends Serializable{
+
+  override def equals(other:Any): Boolean = {
+    val otherWrapper = other.asInstanceOf[ColumnFamilyQualifierMapKeyWrapper]
+
+    Bytes.compareTo(columnFamily,
+      columnFamilyOffSet,
+      columnFamilyLength,
+      otherWrapper.columnFamily,
+      otherWrapper.columnFamilyOffSet,
+      otherWrapper.columnFamilyLength) == 0 && Bytes.compareTo(qualifier,
+        qualifierOffSet,
+        qualifierLength,
+        otherWrapper.qualifier,
+        otherWrapper.qualifierOffSet,
+        otherWrapper.qualifierLength) == 0
+  }
+
+  override def hashCode():Int = {
+    Bytes.hashCode(columnFamily, columnFamilyOffSet, columnFamilyLength) +
+      Bytes.hashCode(qualifier, qualifierOffSet, qualifierLength)
+  }
+
+  def cloneColumnFamily():Array[Byte] = {
+    val resultArray = new Array[Byte](columnFamilyLength)
+    System.arraycopy(columnFamily, columnFamilyOffSet, resultArray, 0, columnFamilyLength)
+    resultArray
+  }
+
+  def cloneQualifier():Array[Byte] = {
+    val resultArray = new Array[Byte](qualifierLength)
+    System.arraycopy(qualifier, qualifierOffSet, resultArray, 0, qualifierLength)
+    resultArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
new file mode 100644
index 0000000..a180de2
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -0,0 +1,982 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
+import org.apache.hadoop.hbase.types._
+import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, PositionedByteRange, Bytes}
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * DefaultSource for integration with Spark's dataframe datasources.
+ * This class will produce a relationProvider based on input given to it from spark
+ *
+ * In all this DefaultSource support the following datasource functionality
+ * - Scan range pruning through filter push down logic based on rowKeys
+ * - Filter push down logic on HBase Cells
+ * - Qualifier filtering based on columns used in the SparkSQL statement
+ * - Type conversions of basic SQL types.  All conversions will be
+ *   Through the HBase Bytes object commands.
+ */
+class DefaultSource extends RelationProvider {
+
+  val TABLE_KEY:String = "hbase.table"
+  val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
+  val BATCHING_NUM_KEY:String = "hbase.batching.num"
+  val CACHING_NUM_KEY:String = "hbase.caching.num"
+  val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
+  val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
+
+  /**
+   * Is given input from SparkSQL to construct a BaseRelation
+   * @param sqlContext SparkSQL context
+   * @param parameters Parameters given to us from SparkSQL
+   * @return           A BaseRelation Object
+   */
+  override def createRelation(sqlContext: SQLContext,
+                              parameters: Map[String, String]):
+  BaseRelation = {
+
+
+    val tableName = parameters.get(TABLE_KEY)
+    if (tableName.isEmpty)
+      new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'")
+
+    val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
+    val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000")
+    val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
+    val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
+    val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
+
+    val batchingNum:Int = try {
+      batchingNumStr.toInt
+    } catch {
+      case e:NumberFormatException => throw
+        new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY +" '"
+            + batchingNumStr + "'", e)
+    }
+
+    val cachingNum:Int = try {
+      cachingNumStr.toInt
+    } catch {
+      case e:NumberFormatException => throw
+        new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY +" '"
+            + cachingNumStr + "'", e)
+    }
+
+    new HBaseRelation(tableName.get,
+      generateSchemaMappingMap(schemaMappingString),
+      batchingNum.toInt,
+      cachingNum.toInt,
+      hbaseConfigResources,
+      useHBaseReources.equalsIgnoreCase("true"))(sqlContext)
+  }
+
+  /**
+   * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of
+   * SchemaQualifierDefinitions with the original sql column name as the key
+   * @param schemaMappingString The schema mapping string from the SparkSQL map
+   * @return                    A map of definitions keyed by the SparkSQL column name
+   */
+  def generateSchemaMappingMap(schemaMappingString:String):
+  java.util.HashMap[String, SchemaQualifierDefinition] = {
+    try {
+      val columnDefinitions = schemaMappingString.split(',')
+      val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]()
+      columnDefinitions.map(cd => {
+        val parts = cd.trim.split(' ')
+
+        //Make sure we get three parts
+        //<ColumnName> <ColumnType> <ColumnFamily:Qualifier>
+        if (parts.length == 3) {
+          val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') {
+            Array[String]("", "key")
+          } else {
+            parts(2).split(':')
+          }
+          resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0),
+            parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1)))
+        } else {
+          throw new IllegalArgumentException("Invalid value for schema mapping '" + cd +
+            "' should be '<columnName> <columnType> <columnFamily>:<qualifier>' " +
+            "for columns and '<columnName> <columnType> :<qualifier>' for rowKeys")
+        }
+      })
+      resultingMap
+    } catch {
+      case e:Exception => throw
+        new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY +
+          " '" + schemaMappingString + "'", e )
+    }
+  }
+}
+
+/**
+ * Implementation of Spark BaseRelation that will build up our scan logic
+ * , do the scan pruning, filter push down, and value conversions
+ *
+ * @param tableName               HBase table that we plan to read from
+ * @param schemaMappingDefinition SchemaMapping information to map HBase
+ *                                Qualifiers to SparkSQL columns
+ * @param batchingNum             The batching number to be applied to the
+ *                                scan object
+ * @param cachingNum              The caching number to be applied to the
+ *                                scan object
+ * @param configResources         Optional comma separated list of config resources
+ *                                to get based on their URI
+ * @param useHBaseContext         If true this will look to see if
+ *                                HBaseContext.latest is populated to use that
+ *                                connection information
+ * @param sqlContext              SparkSQL context
+ */
+class HBaseRelation (val tableName:String,
+                     val schemaMappingDefinition:
+                     java.util.HashMap[String, SchemaQualifierDefinition],
+                     val batchingNum:Int,
+                     val cachingNum:Int,
+                     val configResources:String,
+                     val useHBaseContext:Boolean) (
+  @transient val sqlContext:SQLContext)
+  extends BaseRelation with PrunedFilteredScan with Logging {
+
+  //create or get latest HBaseContext
+  @transient val hbaseContext:HBaseContext = if (useHBaseContext) {
+    LatestHBaseContextCache.latest
+  } else {
+    val config = HBaseConfiguration.create()
+    configResources.split(",").foreach( r => config.addResource(r))
+    new HBaseContext(sqlContext.sparkContext, config)
+  }
+
+  /**
+   * Generates a Spark SQL schema object so Spark SQL knows what is being
+   * provided by this BaseRelation
+   *
+   * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value
+   */
+  override def schema: StructType = {
+
+    val metadataBuilder = new MetadataBuilder()
+
+    val structFieldArray = new Array[StructField](schemaMappingDefinition.size())
+
+    val schemaMappingDefinitionIt = schemaMappingDefinition.values().iterator()
+    var indexCounter = 0
+    while (schemaMappingDefinitionIt.hasNext) {
+      val c = schemaMappingDefinitionIt.next()
+
+      val metadata = metadataBuilder.putString("name", c.columnName).build()
+      val structField =
+        new StructField(c.columnName, c.columnSparkSqlType, nullable = true, metadata)
+
+      structFieldArray(indexCounter) = structField
+      indexCounter += 1
+    }
+
+    val result = new StructType(structFieldArray)
+    result
+  }
+
+  /**
+   * Here we are building the functionality to populate the resulting RDD[Row]
+   * Here is where we will do the following:
+   * - Filter push down
+   * - Scan or GetList pruning
+   * - Executing our scan(s) or/and GetList to generate result
+   *
+   * @param requiredColumns The columns that are being requested by the requesting query
+   * @param filters         The filters that are being applied by the requesting query
+   * @return                RDD will all the results from HBase needed for SparkSQL to
+   *                        execute the query on
+   */
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+
+    val columnFilterCollection = buildColumnFilterCollection(filters)
+
+    val requiredQualifierDefinitionArray = new mutable.MutableList[SchemaQualifierDefinition]
+    requiredColumns.foreach( c => {
+      val definition = schemaMappingDefinition.get(c)
+      if (definition.columnFamilyBytes.length > 0) {
+        requiredQualifierDefinitionArray += definition
+      }
+    })
+
+    //Create a local variable so that scala doesn't have to
+    // serialize the whole HBaseRelation Object
+    val serializableDefinitionMap = schemaMappingDefinition
+
+
+    //retain the information for unit testing checks
+    DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection,
+      requiredQualifierDefinitionArray)
+    var resultRDD: RDD[Row] = null
+
+    if (columnFilterCollection != null) {
+      val pushDownFilterJava =
+        new SparkSQLPushDownFilter(
+          columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition))
+
+      val getList = new util.ArrayList[Get]()
+      val rddList = new util.ArrayList[RDD[Row]]()
+
+      val it = columnFilterCollection.columnFilterMap.iterator
+      while (it.hasNext) {
+        val e = it.next()
+        val columnDefinition = schemaMappingDefinition.get(e._1)
+        //check is a rowKey
+        if (columnDefinition != null && columnDefinition.columnFamily.isEmpty) {
+          //add points to getList
+          e._2.points.foreach(p => {
+            val get = new Get(p)
+            requiredQualifierDefinitionArray.foreach( d =>
+              get.addColumn(d.columnFamilyBytes, d.qualifierBytes))
+            getList.add(get)
+          })
+
+          val rangeIt = e._2.ranges.iterator
+
+          while (rangeIt.hasNext) {
+            val r = rangeIt.next()
+
+            val scan = new Scan()
+            scan.setBatch(batchingNum)
+            scan.setCaching(cachingNum)
+            requiredQualifierDefinitionArray.foreach( d =>
+              scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
+
+            if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 0) {
+              scan.setFilter(pushDownFilterJava)
+            }
+
+            //Check if there is a lower bound
+            if (r.lowerBound != null && r.lowerBound.length > 0) {
+
+              if (r.isLowerBoundEqualTo) {
+                //HBase startRow is inclusive: Therefore it acts like  isLowerBoundEqualTo
+                // by default
+                scan.setStartRow(r.lowerBound)
+              } else {
+                //Since we don't equalTo we want the next value we need
+                // to add another byte to the start key.  That new byte will be
+                // the min byte value.
+                val newArray = new Array[Byte](r.lowerBound.length + 1)
+                System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
+
+                //new Min Byte
+                newArray(r.lowerBound.length) = Byte.MinValue
+                scan.setStartRow(newArray)
+              }
+            }
+
+            //Check if there is a upperBound
+            if (r.upperBound != null && r.upperBound.length > 0) {
+              if (r.isUpperBoundEqualTo) {
+                //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
+                // by default.  So we need to add a new max byte to the stopRow key
+                val newArray = new Array[Byte](r.upperBound.length + 1)
+                System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
+
+                //New Max Bytes
+                newArray(r.upperBound.length) = Byte.MaxValue
+
+                scan.setStopRow(newArray)
+              } else {
+                //Here equalTo is false for Upper bound which is exclusive and
+                // HBase stopRow acts like that by default so no need to mutate the
+                // rowKey
+                scan.setStopRow(r.upperBound)
+              }
+            }
+
+            val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
+              Row.fromSeq(requiredColumns.map(c =>
+                DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
+            })
+            rddList.add(rdd)
+          }
+        }
+      }
+
+      //If there is more then one RDD then we have to union them together
+      for (i <- 0 until rddList.size()) {
+        if (resultRDD == null) resultRDD = rddList.get(i)
+        else resultRDD = resultRDD.union(rddList.get(i))
+
+      }
+
+      //If there are gets then we can get them from the driver and union that rdd in
+      // with the rest of the values.
+      if (getList.size() > 0) {
+        val connection = ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
+        try {
+          val table = connection.getTable(TableName.valueOf(tableName))
+          try {
+            val results = table.get(getList)
+            val rowList = mutable.MutableList[Row]()
+            for (i <- 0 until results.length) {
+              val rowArray = requiredColumns.map(c =>
+                DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
+              rowList += Row.fromSeq(rowArray)
+            }
+            val getRDD = sqlContext.sparkContext.parallelize(rowList)
+            if (resultRDD == null) resultRDD = getRDD
+            else {
+              resultRDD = resultRDD.union(getRDD)
+            }
+          } finally {
+            table.close()
+          }
+        } finally {
+          connection.close()
+        }
+      }
+    }
+    if (resultRDD == null) {
+      val scan = new Scan()
+      scan.setBatch(batchingNum)
+      scan.setCaching(cachingNum)
+      requiredQualifierDefinitionArray.foreach( d =>
+        scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
+
+      val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
+        Row.fromSeq(requiredColumns.map(c => DefaultSourceStaticUtils.getValue(c,
+          serializableDefinitionMap, r._2)))
+      })
+      resultRDD=rdd
+    }
+    resultRDD
+  }
+
+  /**
+   * Root recursive function that will loop over the filters provided by
+   * SparkSQL.  Some filters are AND or OR functions and contain additional filters
+   * hence the need for recursion.
+   *
+   * @param filters Filters provided by SparkSQL.
+   *                Filters are joined with the AND operater
+   * @return        A ColumnFilterCollection whish is a consolidated construct to
+   *                hold the high level filter information
+   */
+  def buildColumnFilterCollection(filters: Array[Filter]): ColumnFilterCollection = {
+    var superCollection: ColumnFilterCollection = null
+
+    filters.foreach( f => {
+      val parentCollection = new ColumnFilterCollection
+      buildColumnFilterCollection(parentCollection, f)
+      if (superCollection == null)
+        superCollection = parentCollection
+      else
+        superCollection.mergeIntersect(parentCollection)
+    })
+    superCollection
+  }
+
+  /**
+   * Recursive function that will work to convert Spark Filter
+   * objects to ColumnFilterCollection
+   *
+   * @param parentFilterCollection Parent ColumnFilterCollection
+   * @param filter                 Current given filter from SparkSQL
+   */
+  def buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection,
+                                  filter:Filter): Unit = {
+    filter match {
+
+      case EqualTo(attr, value) =>
+        parentFilterCollection.mergeUnion(attr,
+          new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr,
+            schemaMappingDefinition, value.toString)))
+
+      case LessThan(attr, value) =>
+        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
+          new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
+            schemaMappingDefinition, value.toString), false,
+            new Array[Byte](0), true)))
+
+      case GreaterThan(attr, value) =>
+        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
+        new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
+          schemaMappingDefinition, value.toString), false)))
+
+      case LessThanOrEqual(attr, value) =>
+        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
+        new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
+          schemaMappingDefinition, value.toString), true,
+          new Array[Byte](0), true)))
+
+      case GreaterThanOrEqual(attr, value) =>
+        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
+        new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
+          schemaMappingDefinition, value.toString), true)))
+
+      case Or(left, right) =>
+        buildColumnFilterCollection(parentFilterCollection, left)
+        val rightSideCollection = new ColumnFilterCollection
+        buildColumnFilterCollection(rightSideCollection, right)
+        parentFilterCollection.mergeUnion(rightSideCollection)
+      case And(left, right) =>
+        buildColumnFilterCollection(parentFilterCollection, left)
+        val rightSideCollection = new ColumnFilterCollection
+        buildColumnFilterCollection(rightSideCollection, right)
+        parentFilterCollection.mergeIntersect(rightSideCollection)
+      case _ => //nothing
+    }
+  }
+}
+
+/**
+ * Construct to contains column data that spend SparkSQL and HBase
+ *
+ * @param columnName   SparkSQL column name
+ * @param colType      SparkSQL column type
+ * @param columnFamily HBase column family
+ * @param qualifier    HBase qualifier name
+ */
+case class SchemaQualifierDefinition(columnName:String,
+                          colType:String,
+                          columnFamily:String,
+                          qualifier:String) extends Serializable {
+  val columnFamilyBytes = Bytes.toBytes(columnFamily)
+  val qualifierBytes = Bytes.toBytes(qualifier)
+  val columnSparkSqlType:DataType = if (colType.equals("BOOLEAN")) BooleanType
+    else if (colType.equals("TINYINT")) IntegerType
+    else if (colType.equals("INT")) IntegerType
+    else if (colType.equals("BIGINT")) LongType
+    else if (colType.equals("FLOAT")) FloatType
+    else if (colType.equals("DOUBLE")) DoubleType
+    else if (colType.equals("STRING")) StringType
+    else if (colType.equals("TIMESTAMP")) TimestampType
+    else if (colType.equals("DECIMAL")) StringType //DataTypes.createDecimalType(precision, scale)
+    else throw new IllegalArgumentException("Unsupported column type :" + colType)
+}
+
+/**
+ * Construct to contain a single scan ranges information.  Also
+ * provide functions to merge with other scan ranges through AND
+ * or OR operators
+ *
+ * @param upperBound          Upper bound of scan
+ * @param isUpperBoundEqualTo Include upper bound value in the results
+ * @param lowerBound          Lower bound of scan
+ * @param isLowerBoundEqualTo Include lower bound value in the results
+ */
+class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
+                var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean)
+  extends Serializable {
+
+  /**
+   * Function to merge another scan object through a AND operation
+   * @param other Other scan object
+   */
+  def mergeIntersect(other:ScanRange): Unit = {
+    val upperBoundCompare = compareRange(upperBound, other.upperBound)
+    val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
+
+    upperBound = if (upperBoundCompare <0) upperBound else other.upperBound
+    lowerBound = if (lowerBoundCompare >0) lowerBound else other.lowerBound
+
+    isLowerBoundEqualTo = if (lowerBoundCompare == 0)
+      isLowerBoundEqualTo && other.isLowerBoundEqualTo
+    else isLowerBoundEqualTo
+
+    isUpperBoundEqualTo = if (upperBoundCompare == 0)
+      isUpperBoundEqualTo && other.isUpperBoundEqualTo
+    else isUpperBoundEqualTo
+  }
+
+  /**
+   * Function to merge another scan object through a OR operation
+   * @param other Other scan object
+   */
+  def mergeUnion(other:ScanRange): Unit = {
+
+    val upperBoundCompare = compareRange(upperBound, other.upperBound)
+    val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
+
+    upperBound = if (upperBoundCompare >0) upperBound else other.upperBound
+    lowerBound = if (lowerBoundCompare <0) lowerBound else other.lowerBound
+
+    isLowerBoundEqualTo = if (lowerBoundCompare == 0)
+      isLowerBoundEqualTo || other.isLowerBoundEqualTo
+    else isLowerBoundEqualTo
+
+    isUpperBoundEqualTo = if (upperBoundCompare == 0)
+      isUpperBoundEqualTo || other.isUpperBoundEqualTo
+    else isUpperBoundEqualTo
+  }
+
+  /**
+   * Common function to see if this scan over laps with another
+   *
+   * Reference Visual
+   *
+   * A                           B
+   * |---------------------------|
+   *   LL--------------LU
+   *        RL--------------RU
+   *
+   * A = lowest value is byte[0]
+   * B = highest value is null
+   * LL = Left Lower Bound
+   * LU = Left Upper Bound
+   * RL = Right Lower Bound
+   * RU = Right Upper Bound
+   *
+   * @param other Other scan object
+   * @return      True is overlap false is not overlap
+   */
+  def doesOverLap(other:ScanRange): Boolean = {
+
+    var leftRange:ScanRange = null
+    var rightRange:ScanRange = null
+
+    //First identify the Left range
+    // Also lower bound can't be null
+    if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) {
+      leftRange = this
+      rightRange = other
+    } else {
+      leftRange = other
+      rightRange = this
+    }
+
+    //Then see if leftRange goes to null or if leftRange.upperBound
+    // upper is greater or equals to rightRange.lowerBound
+    leftRange.upperBound == null ||
+      Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0
+  }
+
+  /**
+   * Special compare logic because we can have null values
+   * for left or right bound
+   *
+   * @param left  Left byte array
+   * @param right Right byte array
+   * @return      0 for equals 1 is left is greater and -1 is right is greater
+   */
+  def compareRange(left:Array[Byte], right:Array[Byte]): Int = {
+    if (left == null && right == null) 0
+    else if (left == null && right != null) 1
+    else if (left != null && right == null) -1
+    else Bytes.compareTo(left, right)
+  }
+  override def toString:String = {
+    "ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + "," +
+      Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")"
+  }
+}
+
+/**
+ * Contains information related to a filters for a given column.
+ * This can contain many ranges or points.
+ *
+ * @param currentPoint the initial point when the filter is created
+ * @param currentRange the initial scanRange when the filter is created
+ */
+class ColumnFilter (currentPoint:Array[Byte] = null,
+                     currentRange:ScanRange = null,
+                     var points:mutable.MutableList[Array[Byte]] =
+                     new mutable.MutableList[Array[Byte]](),
+                     var ranges:mutable.MutableList[ScanRange] =
+                     new mutable.MutableList[ScanRange]() ) extends Serializable {
+  //Collection of ranges
+  if (currentRange != null ) ranges.+=(currentRange)
+
+  //Collection of points
+  if (currentPoint != null) points.+=(currentPoint)
+
+  /**
+   * This will validate a give value through the filter's points and/or ranges
+   * the result will be if the value passed the filter
+   *
+   * @param value       Value to be validated
+   * @param valueOffSet The offset of the value
+   * @param valueLength The length of the value
+   * @return            True is the value passes the filter false if not
+   */
+  def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
+    var result = false
+
+    points.foreach( p => {
+      if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
+        result = true
+      }
+    })
+
+    ranges.foreach( r => {
+      val upperBoundPass = r.upperBound == null ||
+        (r.isUpperBoundEqualTo &&
+          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
+            value, valueOffSet, valueLength) >= 0) ||
+        (!r.isUpperBoundEqualTo &&
+          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
+            value, valueOffSet, valueLength) > 0)
+
+      val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
+        (r.isLowerBoundEqualTo &&
+          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
+            value, valueOffSet, valueLength) <= 0) ||
+        (!r.isLowerBoundEqualTo &&
+          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
+            value, valueOffSet, valueLength) < 0)
+
+      result = result || (upperBoundPass && lowerBoundPass)
+    })
+    result
+  }
+
+  /**
+   * This will allow us to merge filter logic that is joined to the existing filter
+   * through a OR operator
+   *
+   * @param other Filter to merge
+   */
+  def mergeUnion(other:ColumnFilter): Unit = {
+    other.points.foreach( p => points += p)
+
+    other.ranges.foreach( otherR => {
+      var doesOverLap = false
+      ranges.foreach{ r =>
+        if (r.doesOverLap(otherR)) {
+          r.mergeUnion(otherR)
+          doesOverLap = true
+        }}
+      if (!doesOverLap) ranges.+=(otherR)
+    })
+  }
+
+  /**
+   * This will allow us to merge filter logic that is joined to the existing filter
+   * through a AND operator
+   *
+   * @param other Filter to merge
+   */
+  def mergeIntersect(other:ColumnFilter): Unit = {
+    val survivingPoints = new mutable.MutableList[Array[Byte]]()
+    points.foreach( p => {
+      other.points.foreach( otherP => {
+        if (Bytes.equals(p, otherP)) {
+          survivingPoints.+=(p)
+        }
+      })
+    })
+    points = survivingPoints
+
+    val survivingRanges = new mutable.MutableList[ScanRange]()
+
+    other.ranges.foreach( otherR => {
+      ranges.foreach( r => {
+        if (r.doesOverLap(otherR)) {
+          r.mergeIntersect(otherR)
+          survivingRanges += r
+        }
+      })
+    })
+    ranges = survivingRanges
+  }
+
+  override def toString:String = {
+    val strBuilder = new StringBuilder
+    strBuilder.append("(points:(")
+    var isFirst = true
+    points.foreach( p => {
+      if (isFirst) isFirst = false
+      else strBuilder.append(",")
+      strBuilder.append(Bytes.toString(p))
+    })
+    strBuilder.append("),ranges:")
+    isFirst = true
+    ranges.foreach( r => {
+      if (isFirst) isFirst = false
+      else strBuilder.append(",")
+      strBuilder.append(r)
+    })
+    strBuilder.append("))")
+    strBuilder.toString()
+  }
+}
+
+/**
+ * A collection of ColumnFilters indexed by column names.
+ *
+ * Also contains merge commends that will consolidate the filters
+ * per column name
+ */
+class ColumnFilterCollection {
+  val columnFilterMap = new mutable.HashMap[String, ColumnFilter]
+
+  def clear(): Unit = {
+    columnFilterMap.clear()
+  }
+
+  /**
+   * This will allow us to merge filter logic that is joined to the existing filter
+   * through a OR operator.  This will merge a single columns filter
+   *
+   * @param column The column to be merged
+   * @param other  The other ColumnFilter object to merge
+   */
+  def mergeUnion(column:String, other:ColumnFilter): Unit = {
+    val existingFilter = columnFilterMap.get(column)
+    if (existingFilter.isEmpty) {
+      columnFilterMap.+=((column, other))
+    } else {
+      existingFilter.get.mergeUnion(other)
+    }
+  }
+
+  /**
+   * This will allow us to merge all filters in the existing collection
+   * to the filters in the other collection.  All merges are done as a result
+   * of a OR operator
+   *
+   * @param other The other Column Filter Collection to be merged
+   */
+  def mergeUnion(other:ColumnFilterCollection): Unit = {
+    other.columnFilterMap.foreach( e => {
+      mergeUnion(e._1, e._2)
+    })
+  }
+
+  /**
+   * This will allow us to merge all filters in the existing collection
+   * to the filters in the other collection.  All merges are done as a result
+   * of a AND operator
+   *
+   * @param other The column filter from the other collection
+   */
+  def mergeIntersect(other:ColumnFilterCollection): Unit = {
+    other.columnFilterMap.foreach( e => {
+      val existingColumnFilter = columnFilterMap.get(e._1)
+      if (existingColumnFilter.isEmpty) {
+        columnFilterMap += e
+      } else {
+        existingColumnFilter.get.mergeIntersect(e._2)
+      }
+    })
+  }
+
+  /**
+   * This will collect all the filter information in a way that is optimized
+   * for the HBase filter commend.  Allowing the filter to be accessed
+   * with columnFamily and qualifier information
+   *
+   * @param schemaDefinitionMap Schema Map that will help us map the right filters
+   *                            to the correct columns
+   * @return                    HashMap oc column filters
+   */
+  def generateFamilyQualifiterFilterMap(schemaDefinitionMap:
+                                        java.util.HashMap[String,
+                                          SchemaQualifierDefinition]):
+  util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter] = {
+    val familyQualifierFilterMap =
+      new util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter]()
+
+    columnFilterMap.foreach( e => {
+      val definition = schemaDefinitionMap.get(e._1)
+      //Don't add rowKeyFilter
+      if (definition.columnFamilyBytes.size > 0) {
+        familyQualifierFilterMap.put(
+          new ColumnFamilyQualifierMapKeyWrapper(
+            definition.columnFamilyBytes, 0, definition.columnFamilyBytes.length,
+            definition.qualifierBytes, 0, definition.qualifierBytes.length), e._2)
+      }
+    })
+    familyQualifierFilterMap
+  }
+
+  override def toString:String = {
+    val strBuilder = new StringBuilder
+    columnFilterMap.foreach( e => strBuilder.append(e))
+    strBuilder.toString()
+  }
+}
+
+/**
+ * Status object to store static functions but also to hold last executed
+ * information that can be used for unit testing.
+ */
+object DefaultSourceStaticUtils {
+
+  val rawInteger = new RawInteger
+  val rawLong = new RawLong
+  val rawFloat = new RawFloat
+  val rawDouble = new RawDouble
+  val rawString = RawString.ASCENDING
+
+  val byteRange = new ThreadLocal[PositionedByteRange]{
+    override def initialValue(): PositionedByteRange = {
+      val range = new SimplePositionedMutableByteRange()
+      range.setOffset(0)
+      range.setPosition(0)
+    }
+  }
+
+  def getFreshByteRange(bytes:Array[Byte]): PositionedByteRange = {
+    getFreshByteRange(bytes, 0, bytes.length)
+  }
+
+  def getFreshByteRange(bytes:Array[Byte],  offset:Int = 0, length:Int): PositionedByteRange = {
+    byteRange.get().set(bytes).setLength(length).setOffset(offset)
+  }
+
+  //This will contain the last 5 filters and required fields used in buildScan
+  // These values can be used in unit testing to make sure we are converting
+  // The Spark SQL input correctly
+  val lastFiveExecutionRules =
+    new ConcurrentLinkedQueue[ExecutionRuleForUnitTesting]()
+
+  /**
+   * This method is to populate the lastFiveExecutionRules for unit test perposes
+   * This method is not thread safe.
+   *
+   * @param columnFilterCollection           The filters in the last job
+   * @param requiredQualifierDefinitionArray The required columns in the last job
+   */
+  def populateLatestExecutionRules(columnFilterCollection: ColumnFilterCollection,
+                                   requiredQualifierDefinitionArray:
+                                   mutable.MutableList[SchemaQualifierDefinition]):Unit = {
+    lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
+      columnFilterCollection, requiredQualifierDefinitionArray))
+    while (lastFiveExecutionRules.size() > 5) {
+      lastFiveExecutionRules.poll()
+    }
+  }
+
+  /**
+   * This method will convert the result content from HBase into the
+   * SQL value type that is requested by the Spark SQL schema definition
+   *
+   * @param columnName              The name of the SparkSQL Column
+   * @param schemaMappingDefinition The schema definition map
+   * @param r                       The result object from HBase
+   * @return                        The converted object type
+   */
+  def getValue(columnName: String,
+               schemaMappingDefinition:
+               java.util.HashMap[String, SchemaQualifierDefinition],
+               r: Result): Any = {
+
+    val columnDef = schemaMappingDefinition.get(columnName)
+
+    if (columnDef == null) throw new IllegalArgumentException("Unknown column:" + columnName)
+
+
+    if (columnDef.columnFamilyBytes.isEmpty) {
+      val row = r.getRow
+
+      columnDef.columnSparkSqlType match {
+        case IntegerType => rawInteger.decode(getFreshByteRange(row))
+        case LongType => rawLong.decode(getFreshByteRange(row))
+        case FloatType => rawFloat.decode(getFreshByteRange(row))
+        case DoubleType => rawDouble.decode(getFreshByteRange(row))
+        case StringType => rawString.decode(getFreshByteRange(row))
+        case TimestampType => rawLong.decode(getFreshByteRange(row))
+        case _ => Bytes.toString(row)
+      }
+    } else {
+      val cellByteValue =
+        r.getColumnLatestCell(columnDef.columnFamilyBytes, columnDef.qualifierBytes)
+      if (cellByteValue == null) null
+      else columnDef.columnSparkSqlType match {
+        case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength))
+        case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength))
+        case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength))
+        case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength))
+        case StringType => Bytes.toString(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength)
+        case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength))
+        case _ => Bytes.toString(cellByteValue.getValueArray,
+          cellByteValue.getValueOffset, cellByteValue.getValueLength)
+      }
+    }
+  }
+
+  /**
+   * This will convert the value from SparkSQL to be stored into HBase using the
+   * right byte Type
+   *
+   * @param columnName              SparkSQL column name
+   * @param schemaMappingDefinition Schema definition map
+   * @param value                   String value from SparkSQL
+   * @return                        Returns the byte array to go into HBase
+   */
+  def getByteValue(columnName: String,
+                   schemaMappingDefinition:
+                   java.util.HashMap[String, SchemaQualifierDefinition],
+                   value: String): Array[Byte] = {
+
+    val columnDef = schemaMappingDefinition.get(columnName)
+
+    if (columnDef == null) {
+      throw new IllegalArgumentException("Unknown column:" + columnName)
+    } else {
+      columnDef.columnSparkSqlType match {
+        case IntegerType =>
+          val result = new Array[Byte](Bytes.SIZEOF_INT)
+          val localDataRange = getFreshByteRange(result)
+          rawInteger.encode(localDataRange, value.toInt)
+          localDataRange.getBytes
+        case LongType =>
+          val result = new Array[Byte](Bytes.SIZEOF_LONG)
+          val localDataRange = getFreshByteRange(result)
+          rawLong.encode(localDataRange, value.toLong)
+          localDataRange.getBytes
+        case FloatType =>
+          val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
+          val localDataRange = getFreshByteRange(result)
+          rawFloat.encode(localDataRange, value.toFloat)
+          localDataRange.getBytes
+        case DoubleType =>
+          val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
+          val localDataRange = getFreshByteRange(result)
+          rawDouble.encode(localDataRange, value.toDouble)
+          localDataRange.getBytes
+        case StringType =>
+          Bytes.toBytes(value)
+        case TimestampType =>
+          val result = new Array[Byte](Bytes.SIZEOF_LONG)
+          val localDataRange = getFreshByteRange(result)
+          rawLong.encode(localDataRange, value.toLong)
+          localDataRange.getBytes
+
+        case _ => Bytes.toBytes(value)
+      }
+    }
+  }
+}
+
+class ExecutionRuleForUnitTesting(val columnFilterCollection: ColumnFilterCollection,
+                                  val requiredQualifierDefinitionArray:
+                                  mutable.MutableList[SchemaQualifierDefinition])

http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 9d14e22..57ae6b0 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -68,6 +68,8 @@ class HBaseContext(@transient sc: SparkContext,
   val broadcastedConf = sc.broadcast(new SerializableWritable(config))
   val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
 
+  LatestHBaseContextCache.latest = this
+
   if (tmpHdfsConfgFile != null && config != null) {
     val fs = FileSystem.newInstance(config)
     val tmpPath = new Path(tmpHdfsConfgFile)
@@ -838,3 +840,7 @@ class HBaseContext(@transient sc: SparkContext,
     }
   }
 }
+
+object LatestHBaseContextCache {
+  var latest:HBaseContext = null
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
new file mode 100644
index 0000000..fb475b0
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{TableNotFoundException, TableName, HBaseTestingUtility}
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.{SparkContext, Logging}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class DefaultSourceSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+  @transient var sc: SparkContext = null
+  var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
+
+  val tableName = "t1"
+  val columnFamily = "c"
+
+  var sqlContext:SQLContext = null
+  var df:DataFrame = null
+
+  override def beforeAll() {
+
+    TEST_UTIL.startMiniCluster
+
+    logInfo(" - minicluster started")
+    try
+      TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    catch {
+      case e: Exception => logInfo(" - no table " + tableName + " found")
+
+    }
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+    logInfo(" - created table")
+
+    sc = new SparkContext("local", "test")
+
+    val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      var put = new Put(Bytes.toBytes("get1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get2"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get3"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get4"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get5"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
+      table.put(put)
+    } finally {
+      table.close()
+      connection.close()
+    }
+
+    new HBaseContext(sc, TEST_UTIL.getConfiguration)
+    sqlContext = new SQLContext(sc)
+
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
+        "hbase.table" -> "t1",
+        "hbase.batching.num" -> "100",
+        "cachingNum" -> "100"))
+
+    df.registerTempTable("hbaseTmp")
+  }
+
+  override def afterAll() {
+    TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    logInfo("shuting down minicluster")
+    TEST_UTIL.shutdownMiniCluster()
+
+    sc.stop()
+  }
+
+
+  /**
+   * A example of query three fields and also only using rowkey points for the filter
+   */
+  test("Test rowKey point only rowKey query") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 3)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
+    val keyFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
+    assert(keyFieldFilter.ranges.length == 0)
+    assert(keyFieldFilter.points.length == 3)
+    assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
+    assert(Bytes.toString(keyFieldFilter.points(1)).equals("get2"))
+    assert(Bytes.toString(keyFieldFilter.points(2)).equals("get3"))
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+  }
+
+  /**
+   * A example of query three fields and also only using cell points for the filter
+   */
+  test("Test cell point only rowKey query") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 3)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
+    val bFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
+    assert(bFieldFilter.ranges.length == 0)
+    assert(bFieldFilter.points.length == 2)
+    val aFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("A_FIELD").get
+    assert(aFieldFilter.ranges.length == 0)
+    assert(aFieldFilter.points.length == 1)
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+  }
+
+  /**
+   * A example of a OR merge between to ranges the result is one range
+   * Also an example of less then and greater then
+   */
+  test("Test two range rowKey query") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 3)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
+    val keyFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
+    assert(keyFieldFilter.ranges.length == 2)
+    assert(keyFieldFilter.points.length == 0)
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+  }
+
+  /**
+   * A example of a AND merge between to ranges the result is one range
+   * Also an example of less then and equal to and greater then and equal to
+   */
+  test("Test one combined range rowKey query") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 2)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
+    val keyFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
+    assert(keyFieldFilter.ranges.length == 1)
+    assert(keyFieldFilter.points.length == 0)
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+  }
+
+  /**
+   * Do a select with no filters
+   */
+  test("Test select only query") {
+
+    val results = df.select("KEY_FIELD").take(10)
+    assert(results.length == 5)
+
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(executionRules.columnFilterCollection == null)
+    assert(executionRules.requiredQualifierDefinitionArray.length == 0)
+
+  }
+
+  /**
+   * A complex query with one point and one range for both the
+   * rowKey and the a column
+   */
+  test("Test SQL point and range combo") {
+    val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
+      "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 3)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 2)
+    val keyFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
+    assert(keyFieldFilter.ranges.length == 1)
+    assert(keyFieldFilter.ranges.head.upperBound == null)
+    assert(Bytes.toString(keyFieldFilter.ranges.head.lowerBound).equals("get3"))
+    assert(keyFieldFilter.ranges.head.isLowerBoundEqualTo)
+    assert(keyFieldFilter.points.length == 1)
+    assert(Bytes.toString(keyFieldFilter.points.head).equals("get1"))
+
+    val bFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("B_FIELD").get
+    assert(bFieldFilter.ranges.length == 1)
+    assert(bFieldFilter.ranges.head.lowerBound.length == 0)
+    assert(Bytes.toString(bFieldFilter.ranges.head.upperBound).equals("3"))
+    assert(!bFieldFilter.ranges.head.isUpperBoundEqualTo)
+    assert(bFieldFilter.points.length == 1)
+    assert(Bytes.toString(bFieldFilter.points.head).equals("8"))
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 1)
+    assert(executionRules.requiredQualifierDefinitionArray.head.columnName.equals("B_FIELD"))
+    assert(executionRules.requiredQualifierDefinitionArray.head.columnFamily.equals("c"))
+    assert(executionRules.requiredQualifierDefinitionArray.head.qualifier.equals("b"))
+  }
+
+  /**
+   * A complex query with two complex ranges that doesn't merge into one
+   */
+  test("Test two complete range non merge rowKey query") {
+
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
+      "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+    assert(results.length == 4)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
+    val keyFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
+    assert(keyFieldFilter.ranges.length == 2)
+    assert(keyFieldFilter.points.length == 0)
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+  }
+
+  /**
+   * A complex query with two complex ranges that does merge into one
+   */
+  test("Test two complete range merge rowKey query") {
+    val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp " +
+      "WHERE " +
+      "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
+      "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
+
+    val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
+
+    assert(results.length == 5)
+
+    assert(executionRules.columnFilterCollection.columnFilterMap.size == 1)
+    val keyFieldFilter =
+      executionRules.columnFilterCollection.columnFilterMap.get("KEY_FIELD").get
+    assert(keyFieldFilter.ranges.length == 1)
+    assert(keyFieldFilter.points.length == 0)
+
+    assert(executionRules.requiredQualifierDefinitionArray.length == 2)
+  }
+
+  test("test table that doesn't exist") {
+    intercept[TableNotFoundException] {
+      df = sqlContext.load("org.apache.hadoop.hbase.spark",
+        Map("hbase.columns.mapping" ->
+          "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
+          "hbase.table" -> "t1NotThere"))
+
+      df.registerTempTable("hbaseNonExistingTmp")
+
+      sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNonExistingTmp " +
+        "WHERE " +
+        "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
+        "( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
+    }
+  }
+
+  test("Test table with column that doesn't exist") {
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, C_FIELD STRING c:c,",
+        "hbase.table" -> "t1"))
+
+    df.registerTempTable("hbaseFactColumnTmp")
+
+    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
+
+    assert(result.count() == 5)
+
+    val localResult = result.take(5)
+  }
+
+  test("Test table with INT column") {
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD INT c:i,",
+        "hbase.table" -> "t1"))
+
+    df.registerTempTable("hbaseIntTmp")
+
+    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
+    " where I_FIELD > 4 and I_FIELD < 10")
+
+    assert(result.count() == 2)
+
+    val localResult = result.take(3)
+
+    assert(localResult(0).getInt(2) == 8)
+  }
+
+  test("Test table with INT column defined at wrong type") {
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
+        "hbase.table" -> "t1"))
+
+    df.registerTempTable("hbaseIntWrongTypeTmp")
+
+    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+
+    assert(result.count() == 5)
+
+    val localResult = result.take(5)
+
+    assert(localResult(0).getString(2).length == 4)
+    assert(localResult(0).getString(2).charAt(0).toByte == 0)
+    assert(localResult(0).getString(2).charAt(1).toByte == 0)
+    assert(localResult(0).getString(2).charAt(2).toByte == 0)
+    assert(localResult(0).getString(2).charAt(3).toByte == 1)
+  }
+
+  test("Test improperly formatted column mapping") {
+    intercept[IllegalArgumentException] {
+      df = sqlContext.load("org.apache.hadoop.hbase.spark",
+        Map("hbase.columns.mapping" ->
+          "KEY_FIELD,STRING,:key, A_FIELD,STRING,c:a, B_FIELD,STRING,c:b, I_FIELD,STRING,c:i,",
+          "hbase.table" -> "t1"))
+
+      df.registerTempTable("hbaseBadTmp")
+
+      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseBadTmp")
+
+      val localResult = result.take(5)
+    }
+  }
+
+
+  test("Test bad column type") {
+    intercept[IllegalArgumentException] {
+      df = sqlContext.load("org.apache.hadoop.hbase.spark",
+        Map("hbase.columns.mapping" ->
+          "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
+          "hbase.table" -> "t1"))
+
+      df.registerTempTable("hbaseIntWrongTypeTmp")
+
+      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+
+      assert(result.count() == 5)
+
+      val localResult = result.take(5)
+    }
+  }
+
+  test("Test bad hbase.batching.num type") {
+    intercept[IllegalArgumentException] {
+      df = sqlContext.load("org.apache.hadoop.hbase.spark",
+        Map("hbase.columns.mapping" ->
+          "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
+          "hbase.table" -> "t1", "hbase.batching.num" -> "foo"))
+
+      df.registerTempTable("hbaseIntWrongTypeTmp")
+
+      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+
+      assert(result.count() == 5)
+
+      val localResult = result.take(5)
+    }
+  }
+
+  test("Test bad hbase.caching.num type") {
+    intercept[IllegalArgumentException] {
+      df = sqlContext.load("org.apache.hadoop.hbase.spark",
+        Map("hbase.columns.mapping" ->
+          "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
+          "hbase.table" -> "t1", "hbase.caching.num" -> "foo"))
+
+      df.registerTempTable("hbaseIntWrongTypeTmp")
+
+      val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
+
+      assert(result.count() == 5)
+
+      val localResult = result.take(5)
+    }
+  }
+
+  test("Test table with sparse column") {
+    df = sqlContext.load("org.apache.hadoop.hbase.spark",
+      Map("hbase.columns.mapping" ->
+        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,",
+        "hbase.table" -> "t1"))
+
+    df.registerTempTable("hbaseZTmp")
+
+    val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
+
+    assert(result.count() == 5)
+
+    val localResult = result.take(5)
+
+    assert(localResult(0).getString(2) == null)
+    assert(localResult(1).getString(2) == "FOO")
+    assert(localResult(2).getString(2) == null)
+    assert(localResult(3).getString(2) == "BAR")
+    assert(localResult(4).getString(2) == null)
+
+  }
+}


[2/2] hbase git commit: HBASE-14181 Add Spark DataFrame DataSource to HBase-Spark Module

Posted by bu...@apache.org.
HBASE-14181 Add Spark DataFrame DataSource to HBase-Spark Module

Signed-off-by: Sean Busbey <bu...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e95358a7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e95358a7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e95358a7

Branch: refs/heads/master
Commit: e95358a7fc3f554dcbb351c8b7295cafc01e8c23
Parents: bada19b
Author: Ted Malaska <te...@cloudera.com>
Authored: Mon Sep 7 09:19:52 2015 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Mon Sep 7 09:44:10 2015 -0500

----------------------------------------------------------------------
 .../hbase/protobuf/generated/FilterProtos.java  | 1825 +++++++++++++++++-
 hbase-protocol/src/main/protobuf/Filter.proto   |   13 +
 hbase-spark/pom.xml                             |   12 +
 .../hbase/spark/SparkSQLPushDownFilter.java     |  186 ++
 .../ColumnFamilyQualifierMapKeyWrapper.scala    |   73 +
 .../hadoop/hbase/spark/DefaultSource.scala      |  982 ++++++++++
 .../hadoop/hbase/spark/HBaseContext.scala       |    6 +
 .../hadoop/hbase/spark/DefaultSourceSuite.scala |  462 +++++
 8 files changed, 3556 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java
index e558371..fe18cae 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java
@@ -17300,6 +17300,1797 @@ public final class FilterProtos {
     // @@protoc_insertion_point(class_scope:hbase.pb.MultiRowRangeFilter)
   }
 
+  public interface SQLPredicatePushDownColumnFilterOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // required bytes column_family = 1;
+    /**
+     * <code>required bytes column_family = 1;</code>
+     */
+    boolean hasColumnFamily();
+    /**
+     * <code>required bytes column_family = 1;</code>
+     */
+    com.google.protobuf.ByteString getColumnFamily();
+
+    // required bytes qualifier = 2;
+    /**
+     * <code>required bytes qualifier = 2;</code>
+     */
+    boolean hasQualifier();
+    /**
+     * <code>required bytes qualifier = 2;</code>
+     */
+    com.google.protobuf.ByteString getQualifier();
+
+    // repeated bytes get_point_list = 3;
+    /**
+     * <code>repeated bytes get_point_list = 3;</code>
+     */
+    java.util.List<com.google.protobuf.ByteString> getGetPointListList();
+    /**
+     * <code>repeated bytes get_point_list = 3;</code>
+     */
+    int getGetPointListCount();
+    /**
+     * <code>repeated bytes get_point_list = 3;</code>
+     */
+    com.google.protobuf.ByteString getGetPointList(int index);
+
+    // repeated .hbase.pb.RowRange range_list = 4;
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange>
+        getRangeListList();
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange getRangeList(int index);
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    int getRangeListCount();
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>
+        getRangeListOrBuilderList();
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder getRangeListOrBuilder(
+        int index);
+  }
+  /**
+   * Protobuf type {@code hbase.pb.SQLPredicatePushDownColumnFilter}
+   */
+  public static final class SQLPredicatePushDownColumnFilter extends
+      com.google.protobuf.GeneratedMessage
+      implements SQLPredicatePushDownColumnFilterOrBuilder {
+    // Use SQLPredicatePushDownColumnFilter.newBuilder() to construct.
+    private SQLPredicatePushDownColumnFilter(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private SQLPredicatePushDownColumnFilter(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final SQLPredicatePushDownColumnFilter defaultInstance;
+    public static SQLPredicatePushDownColumnFilter getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public SQLPredicatePushDownColumnFilter getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private SQLPredicatePushDownColumnFilter(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              columnFamily_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              qualifier_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+                getPointList_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+                mutable_bitField0_ |= 0x00000004;
+              }
+              getPointList_.add(input.readBytes());
+              break;
+            }
+            case 34: {
+              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+                rangeList_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange>();
+                mutable_bitField0_ |= 0x00000008;
+              }
+              rangeList_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.PARSER, extensionRegistry));
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+          getPointList_ = java.util.Collections.unmodifiableList(getPointList_);
+        }
+        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+          rangeList_ = java.util.Collections.unmodifiableList(rangeList_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<SQLPredicatePushDownColumnFilter> PARSER =
+        new com.google.protobuf.AbstractParser<SQLPredicatePushDownColumnFilter>() {
+      public SQLPredicatePushDownColumnFilter parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new SQLPredicatePushDownColumnFilter(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<SQLPredicatePushDownColumnFilter> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // required bytes column_family = 1;
+    public static final int COLUMN_FAMILY_FIELD_NUMBER = 1;
+    private com.google.protobuf.ByteString columnFamily_;
+    /**
+     * <code>required bytes column_family = 1;</code>
+     */
+    public boolean hasColumnFamily() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required bytes column_family = 1;</code>
+     */
+    public com.google.protobuf.ByteString getColumnFamily() {
+      return columnFamily_;
+    }
+
+    // required bytes qualifier = 2;
+    public static final int QUALIFIER_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString qualifier_;
+    /**
+     * <code>required bytes qualifier = 2;</code>
+     */
+    public boolean hasQualifier() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>required bytes qualifier = 2;</code>
+     */
+    public com.google.protobuf.ByteString getQualifier() {
+      return qualifier_;
+    }
+
+    // repeated bytes get_point_list = 3;
+    public static final int GET_POINT_LIST_FIELD_NUMBER = 3;
+    private java.util.List<com.google.protobuf.ByteString> getPointList_;
+    /**
+     * <code>repeated bytes get_point_list = 3;</code>
+     */
+    public java.util.List<com.google.protobuf.ByteString>
+        getGetPointListList() {
+      return getPointList_;
+    }
+    /**
+     * <code>repeated bytes get_point_list = 3;</code>
+     */
+    public int getGetPointListCount() {
+      return getPointList_.size();
+    }
+    /**
+     * <code>repeated bytes get_point_list = 3;</code>
+     */
+    public com.google.protobuf.ByteString getGetPointList(int index) {
+      return getPointList_.get(index);
+    }
+
+    // repeated .hbase.pb.RowRange range_list = 4;
+    public static final int RANGE_LIST_FIELD_NUMBER = 4;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange> rangeList_;
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange> getRangeListList() {
+      return rangeList_;
+    }
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>
+        getRangeListOrBuilderList() {
+      return rangeList_;
+    }
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    public int getRangeListCount() {
+      return rangeList_.size();
+    }
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange getRangeList(int index) {
+      return rangeList_.get(index);
+    }
+    /**
+     * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder getRangeListOrBuilder(
+        int index) {
+      return rangeList_.get(index);
+    }
+
+    private void initFields() {
+      columnFamily_ = com.google.protobuf.ByteString.EMPTY;
+      qualifier_ = com.google.protobuf.ByteString.EMPTY;
+      getPointList_ = java.util.Collections.emptyList();
+      rangeList_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      if (!hasColumnFamily()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasQualifier()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, columnFamily_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, qualifier_);
+      }
+      for (int i = 0; i < getPointList_.size(); i++) {
+        output.writeBytes(3, getPointList_.get(i));
+      }
+      for (int i = 0; i < rangeList_.size(); i++) {
+        output.writeMessage(4, rangeList_.get(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, columnFamily_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, qualifier_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < getPointList_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(getPointList_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getGetPointListList().size();
+      }
+      for (int i = 0; i < rangeList_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, rangeList_.get(i));
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter other = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter) obj;
+
+      boolean result = true;
+      result = result && (hasColumnFamily() == other.hasColumnFamily());
+      if (hasColumnFamily()) {
+        result = result && getColumnFamily()
+            .equals(other.getColumnFamily());
+      }
+      result = result && (hasQualifier() == other.hasQualifier());
+      if (hasQualifier()) {
+        result = result && getQualifier()
+            .equals(other.getQualifier());
+      }
+      result = result && getGetPointListList()
+          .equals(other.getGetPointListList());
+      result = result && getRangeListList()
+          .equals(other.getRangeListList());
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasColumnFamily()) {
+        hash = (37 * hash) + COLUMN_FAMILY_FIELD_NUMBER;
+        hash = (53 * hash) + getColumnFamily().hashCode();
+      }
+      if (hasQualifier()) {
+        hash = (37 * hash) + QUALIFIER_FIELD_NUMBER;
+        hash = (53 * hash) + getQualifier().hashCode();
+      }
+      if (getGetPointListCount() > 0) {
+        hash = (37 * hash) + GET_POINT_LIST_FIELD_NUMBER;
+        hash = (53 * hash) + getGetPointListList().hashCode();
+      }
+      if (getRangeListCount() > 0) {
+        hash = (37 * hash) + RANGE_LIST_FIELD_NUMBER;
+        hash = (53 * hash) + getRangeListList().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code hbase.pb.SQLPredicatePushDownColumnFilter}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          getRangeListFieldBuilder();
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        columnFamily_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        qualifier_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        getPointList_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000004);
+        if (rangeListBuilder_ == null) {
+          rangeList_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000008);
+        } else {
+          rangeListBuilder_.clear();
+        }
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter build() {
+        org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter result = new org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.columnFamily_ = columnFamily_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.qualifier_ = qualifier_;
+        if (((bitField0_ & 0x00000004) == 0x00000004)) {
+          getPointList_ = java.util.Collections.unmodifiableList(getPointList_);
+          bitField0_ = (bitField0_ & ~0x00000004);
+        }
+        result.getPointList_ = getPointList_;
+        if (rangeListBuilder_ == null) {
+          if (((bitField0_ & 0x00000008) == 0x00000008)) {
+            rangeList_ = java.util.Collections.unmodifiableList(rangeList_);
+            bitField0_ = (bitField0_ & ~0x00000008);
+          }
+          result.rangeList_ = rangeList_;
+        } else {
+          result.rangeList_ = rangeListBuilder_.build();
+        }
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance()) return this;
+        if (other.hasColumnFamily()) {
+          setColumnFamily(other.getColumnFamily());
+        }
+        if (other.hasQualifier()) {
+          setQualifier(other.getQualifier());
+        }
+        if (!other.getPointList_.isEmpty()) {
+          if (getPointList_.isEmpty()) {
+            getPointList_ = other.getPointList_;
+            bitField0_ = (bitField0_ & ~0x00000004);
+          } else {
+            ensureGetPointListIsMutable();
+            getPointList_.addAll(other.getPointList_);
+          }
+          onChanged();
+        }
+        if (rangeListBuilder_ == null) {
+          if (!other.rangeList_.isEmpty()) {
+            if (rangeList_.isEmpty()) {
+              rangeList_ = other.rangeList_;
+              bitField0_ = (bitField0_ & ~0x00000008);
+            } else {
+              ensureRangeListIsMutable();
+              rangeList_.addAll(other.rangeList_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.rangeList_.isEmpty()) {
+            if (rangeListBuilder_.isEmpty()) {
+              rangeListBuilder_.dispose();
+              rangeListBuilder_ = null;
+              rangeList_ = other.rangeList_;
+              bitField0_ = (bitField0_ & ~0x00000008);
+              rangeListBuilder_ =
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getRangeListFieldBuilder() : null;
+            } else {
+              rangeListBuilder_.addAllMessages(other.rangeList_);
+            }
+          }
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        if (!hasColumnFamily()) {
+
+          return false;
+        }
+        if (!hasQualifier()) {
+
+          return false;
+        }
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // required bytes column_family = 1;
+      private com.google.protobuf.ByteString columnFamily_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>required bytes column_family = 1;</code>
+       */
+      public boolean hasColumnFamily() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required bytes column_family = 1;</code>
+       */
+      public com.google.protobuf.ByteString getColumnFamily() {
+        return columnFamily_;
+      }
+      /**
+       * <code>required bytes column_family = 1;</code>
+       */
+      public Builder setColumnFamily(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        columnFamily_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required bytes column_family = 1;</code>
+       */
+      public Builder clearColumnFamily() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        columnFamily_ = getDefaultInstance().getColumnFamily();
+        onChanged();
+        return this;
+      }
+
+      // required bytes qualifier = 2;
+      private com.google.protobuf.ByteString qualifier_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>required bytes qualifier = 2;</code>
+       */
+      public boolean hasQualifier() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>required bytes qualifier = 2;</code>
+       */
+      public com.google.protobuf.ByteString getQualifier() {
+        return qualifier_;
+      }
+      /**
+       * <code>required bytes qualifier = 2;</code>
+       */
+      public Builder setQualifier(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        qualifier_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required bytes qualifier = 2;</code>
+       */
+      public Builder clearQualifier() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        qualifier_ = getDefaultInstance().getQualifier();
+        onChanged();
+        return this;
+      }
+
+      // repeated bytes get_point_list = 3;
+      private java.util.List<com.google.protobuf.ByteString> getPointList_ = java.util.Collections.emptyList();
+      private void ensureGetPointListIsMutable() {
+        if (!((bitField0_ & 0x00000004) == 0x00000004)) {
+          getPointList_ = new java.util.ArrayList<com.google.protobuf.ByteString>(getPointList_);
+          bitField0_ |= 0x00000004;
+         }
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public java.util.List<com.google.protobuf.ByteString>
+          getGetPointListList() {
+        return java.util.Collections.unmodifiableList(getPointList_);
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public int getGetPointListCount() {
+        return getPointList_.size();
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public com.google.protobuf.ByteString getGetPointList(int index) {
+        return getPointList_.get(index);
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public Builder setGetPointList(
+          int index, com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureGetPointListIsMutable();
+        getPointList_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public Builder addGetPointList(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureGetPointListIsMutable();
+        getPointList_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public Builder addAllGetPointList(
+          java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+        ensureGetPointListIsMutable();
+        super.addAll(values, getPointList_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes get_point_list = 3;</code>
+       */
+      public Builder clearGetPointList() {
+        getPointList_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000004);
+        onChanged();
+        return this;
+      }
+
+      // repeated .hbase.pb.RowRange range_list = 4;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange> rangeList_ =
+        java.util.Collections.emptyList();
+      private void ensureRangeListIsMutable() {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+          rangeList_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange>(rangeList_);
+          bitField0_ |= 0x00000008;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder> rangeListBuilder_;
+
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange> getRangeListList() {
+        if (rangeListBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(rangeList_);
+        } else {
+          return rangeListBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public int getRangeListCount() {
+        if (rangeListBuilder_ == null) {
+          return rangeList_.size();
+        } else {
+          return rangeListBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange getRangeList(int index) {
+        if (rangeListBuilder_ == null) {
+          return rangeList_.get(index);
+        } else {
+          return rangeListBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder setRangeList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange value) {
+        if (rangeListBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureRangeListIsMutable();
+          rangeList_.set(index, value);
+          onChanged();
+        } else {
+          rangeListBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder setRangeList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder builderForValue) {
+        if (rangeListBuilder_ == null) {
+          ensureRangeListIsMutable();
+          rangeList_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          rangeListBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder addRangeList(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange value) {
+        if (rangeListBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureRangeListIsMutable();
+          rangeList_.add(value);
+          onChanged();
+        } else {
+          rangeListBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder addRangeList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange value) {
+        if (rangeListBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureRangeListIsMutable();
+          rangeList_.add(index, value);
+          onChanged();
+        } else {
+          rangeListBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder addRangeList(
+          org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder builderForValue) {
+        if (rangeListBuilder_ == null) {
+          ensureRangeListIsMutable();
+          rangeList_.add(builderForValue.build());
+          onChanged();
+        } else {
+          rangeListBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder addRangeList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder builderForValue) {
+        if (rangeListBuilder_ == null) {
+          ensureRangeListIsMutable();
+          rangeList_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          rangeListBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder addAllRangeList(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange> values) {
+        if (rangeListBuilder_ == null) {
+          ensureRangeListIsMutable();
+          super.addAll(values, rangeList_);
+          onChanged();
+        } else {
+          rangeListBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder clearRangeList() {
+        if (rangeListBuilder_ == null) {
+          rangeList_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000008);
+          onChanged();
+        } else {
+          rangeListBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public Builder removeRangeList(int index) {
+        if (rangeListBuilder_ == null) {
+          ensureRangeListIsMutable();
+          rangeList_.remove(index);
+          onChanged();
+        } else {
+          rangeListBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder getRangeListBuilder(
+          int index) {
+        return getRangeListFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder getRangeListOrBuilder(
+          int index) {
+        if (rangeListBuilder_ == null) {
+          return rangeList_.get(index);  } else {
+          return rangeListBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>
+           getRangeListOrBuilderList() {
+        if (rangeListBuilder_ != null) {
+          return rangeListBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(rangeList_);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder addRangeListBuilder() {
+        return getRangeListFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder addRangeListBuilder(
+          int index) {
+        return getRangeListFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .hbase.pb.RowRange range_list = 4;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder>
+           getRangeListBuilderList() {
+        return getRangeListFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>
+          getRangeListFieldBuilder() {
+        if (rangeListBuilder_ == null) {
+          rangeListBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRange.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.RowRangeOrBuilder>(
+                  rangeList_,
+                  ((bitField0_ & 0x00000008) == 0x00000008),
+                  getParentForChildren(),
+                  isClean());
+          rangeList_ = null;
+        }
+        return rangeListBuilder_;
+      }
+
+      // @@protoc_insertion_point(builder_scope:hbase.pb.SQLPredicatePushDownColumnFilter)
+    }
+
+    static {
+      defaultInstance = new SQLPredicatePushDownColumnFilter(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:hbase.pb.SQLPredicatePushDownColumnFilter)
+  }
+
+  public interface SQLPredicatePushDownFilterOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter>
+        getColumnFilterListList();
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getColumnFilterList(int index);
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    int getColumnFilterListCount();
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder>
+        getColumnFilterListOrBuilderList();
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder getColumnFilterListOrBuilder(
+        int index);
+  }
+  /**
+   * Protobuf type {@code hbase.pb.SQLPredicatePushDownFilter}
+   */
+  public static final class SQLPredicatePushDownFilter extends
+      com.google.protobuf.GeneratedMessage
+      implements SQLPredicatePushDownFilterOrBuilder {
+    // Use SQLPredicatePushDownFilter.newBuilder() to construct.
+    private SQLPredicatePushDownFilter(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private SQLPredicatePushDownFilter(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final SQLPredicatePushDownFilter defaultInstance;
+    public static SQLPredicatePushDownFilter getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public SQLPredicatePushDownFilter getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private SQLPredicatePushDownFilter(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+                columnFilterList_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter>();
+                mutable_bitField0_ |= 0x00000001;
+              }
+              columnFilterList_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.PARSER, extensionRegistry));
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+          columnFilterList_ = java.util.Collections.unmodifiableList(columnFilterList_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<SQLPredicatePushDownFilter> PARSER =
+        new com.google.protobuf.AbstractParser<SQLPredicatePushDownFilter>() {
+      public SQLPredicatePushDownFilter parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new SQLPredicatePushDownFilter(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<SQLPredicatePushDownFilter> getParserForType() {
+      return PARSER;
+    }
+
+    // repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;
+    public static final int COLUMN_FILTER_LIST_FIELD_NUMBER = 1;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter> columnFilterList_;
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter> getColumnFilterListList() {
+      return columnFilterList_;
+    }
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder>
+        getColumnFilterListOrBuilderList() {
+      return columnFilterList_;
+    }
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    public int getColumnFilterListCount() {
+      return columnFilterList_.size();
+    }
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getColumnFilterList(int index) {
+      return columnFilterList_.get(index);
+    }
+    /**
+     * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder getColumnFilterListOrBuilder(
+        int index) {
+      return columnFilterList_.get(index);
+    }
+
+    private void initFields() {
+      columnFilterList_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      for (int i = 0; i < getColumnFilterListCount(); i++) {
+        if (!getColumnFilterList(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      for (int i = 0; i < columnFilterList_.size(); i++) {
+        output.writeMessage(1, columnFilterList_.get(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      for (int i = 0; i < columnFilterList_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(1, columnFilterList_.get(i));
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter other = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter) obj;
+
+      boolean result = true;
+      result = result && getColumnFilterListList()
+          .equals(other.getColumnFilterListList());
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (getColumnFilterListCount() > 0) {
+        hash = (37 * hash) + COLUMN_FILTER_LIST_FIELD_NUMBER;
+        hash = (53 * hash) + getColumnFilterListList().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code hbase.pb.SQLPredicatePushDownFilter}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilterOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.class, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          getColumnFilterListFieldBuilder();
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        if (columnFilterListBuilder_ == null) {
+          columnFilterList_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000001);
+        } else {
+          columnFilterListBuilder_.clear();
+        }
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter build() {
+        org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter result = new org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter(this);
+        int from_bitField0_ = bitField0_;
+        if (columnFilterListBuilder_ == null) {
+          if (((bitField0_ & 0x00000001) == 0x00000001)) {
+            columnFilterList_ = java.util.Collections.unmodifiableList(columnFilterList_);
+            bitField0_ = (bitField0_ & ~0x00000001);
+          }
+          result.columnFilterList_ = columnFilterList_;
+        } else {
+          result.columnFilterList_ = columnFilterListBuilder_.build();
+        }
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter.getDefaultInstance()) return this;
+        if (columnFilterListBuilder_ == null) {
+          if (!other.columnFilterList_.isEmpty()) {
+            if (columnFilterList_.isEmpty()) {
+              columnFilterList_ = other.columnFilterList_;
+              bitField0_ = (bitField0_ & ~0x00000001);
+            } else {
+              ensureColumnFilterListIsMutable();
+              columnFilterList_.addAll(other.columnFilterList_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.columnFilterList_.isEmpty()) {
+            if (columnFilterListBuilder_.isEmpty()) {
+              columnFilterListBuilder_.dispose();
+              columnFilterListBuilder_ = null;
+              columnFilterList_ = other.columnFilterList_;
+              bitField0_ = (bitField0_ & ~0x00000001);
+              columnFilterListBuilder_ =
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getColumnFilterListFieldBuilder() : null;
+            } else {
+              columnFilterListBuilder_.addAllMessages(other.columnFilterList_);
+            }
+          }
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        for (int i = 0; i < getColumnFilterListCount(); i++) {
+          if (!getColumnFilterList(i).isInitialized()) {
+
+            return false;
+          }
+        }
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownFilter) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter> columnFilterList_ =
+        java.util.Collections.emptyList();
+      private void ensureColumnFilterListIsMutable() {
+        if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+          columnFilterList_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter>(columnFilterList_);
+          bitField0_ |= 0x00000001;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder> columnFilterListBuilder_;
+
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter> getColumnFilterListList() {
+        if (columnFilterListBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(columnFilterList_);
+        } else {
+          return columnFilterListBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public int getColumnFilterListCount() {
+        if (columnFilterListBuilder_ == null) {
+          return columnFilterList_.size();
+        } else {
+          return columnFilterListBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter getColumnFilterList(int index) {
+        if (columnFilterListBuilder_ == null) {
+          return columnFilterList_.get(index);
+        } else {
+          return columnFilterListBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder setColumnFilterList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter value) {
+        if (columnFilterListBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.set(index, value);
+          onChanged();
+        } else {
+          columnFilterListBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder setColumnFilterList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder builderForValue) {
+        if (columnFilterListBuilder_ == null) {
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          columnFilterListBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder addColumnFilterList(org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter value) {
+        if (columnFilterListBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.add(value);
+          onChanged();
+        } else {
+          columnFilterListBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder addColumnFilterList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter value) {
+        if (columnFilterListBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.add(index, value);
+          onChanged();
+        } else {
+          columnFilterListBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder addColumnFilterList(
+          org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder builderForValue) {
+        if (columnFilterListBuilder_ == null) {
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.add(builderForValue.build());
+          onChanged();
+        } else {
+          columnFilterListBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder addColumnFilterList(
+          int index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder builderForValue) {
+        if (columnFilterListBuilder_ == null) {
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          columnFilterListBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder addAllColumnFilterList(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter> values) {
+        if (columnFilterListBuilder_ == null) {
+          ensureColumnFilterListIsMutable();
+          super.addAll(values, columnFilterList_);
+          onChanged();
+        } else {
+          columnFilterListBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder clearColumnFilterList() {
+        if (columnFilterListBuilder_ == null) {
+          columnFilterList_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000001);
+          onChanged();
+        } else {
+          columnFilterListBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public Builder removeColumnFilterList(int index) {
+        if (columnFilterListBuilder_ == null) {
+          ensureColumnFilterListIsMutable();
+          columnFilterList_.remove(index);
+          onChanged();
+        } else {
+          columnFilterListBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder getColumnFilterListBuilder(
+          int index) {
+        return getColumnFilterListFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder getColumnFilterListOrBuilder(
+          int index) {
+        if (columnFilterListBuilder_ == null) {
+          return columnFilterList_.get(index);  } else {
+          return columnFilterListBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder>
+           getColumnFilterListOrBuilderList() {
+        if (columnFilterListBuilder_ != null) {
+          return columnFilterListBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(columnFilterList_);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder addColumnFilterListBuilder() {
+        return getColumnFilterListFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder addColumnFilterListBuilder(
+          int index) {
+        return getColumnFilterListFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .hbase.pb.SQLPredicatePushDownColumnFilter column_filter_list = 1;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder>
+           getColumnFilterListBuilderList() {
+        return getColumnFilterListFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder>
+          getColumnFilterListFieldBuilder() {
+        if (columnFilterListBuilder_ == null) {
+          columnFilterListBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilter.Builder, org.apache.hadoop.hbase.protobuf.generated.FilterProtos.SQLPredicatePushDownColumnFilterOrBuilder>(
+                  columnFilterList_,
+                  ((bitField0_ & 0x00000001) == 0x00000001),
+                  getParentForChildren(),
+                  isClean());
+          columnFilterList_ = null;
+        }
+        return columnFilterListBuilder_;
+      }
+
+      // @@protoc_insertion_point(builder_scope:hbase.pb.SQLPredicatePushDownFilter)
+    }
+
+    static {
+      defaultInstance = new SQLPredicatePushDownFilter(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:hbase.pb.SQLPredicatePushDownFilter)
+  }
+
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_hbase_pb_Filter_descriptor;
   private static
@@ -17450,6 +19241,16 @@ public final class FilterProtos {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_hbase_pb_MultiRowRangeFilter_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable;
 
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -17512,9 +19313,15 @@ public final class FilterProtos {
       "ow\030\001 \001(\014\022\033\n\023start_row_inclusive\030\002 \001(\010\022\020\n" +
       "\010stop_row\030\003 \001(\014\022\032\n\022stop_row_inclusive\030\004 " +
       "\001(\010\"A\n\023MultiRowRangeFilter\022*\n\016row_range_" +
-      "list\030\001 \003(\0132\022.hbase.pb.RowRangeBB\n*org.ap" +
-      "ache.hadoop.hbase.protobuf.generatedB\014Fi" +
-      "lterProtosH\001\210\001\001\240\001\001"
+      "list\030\001 \003(\0132\022.hbase.pb.RowRange\"\214\001\n SQLPr" +
+      "edicatePushDownColumnFilter\022\025\n\rcolumn_fa" +
+      "mily\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\022\026\n\016get_poi" +
+      "nt_list\030\003 \003(\014\022&\n\nrange_list\030\004 \003(\0132\022.hbas" +
+      "e.pb.RowRange\"d\n\032SQLPredicatePushDownFil" +
+      "ter\022F\n\022column_filter_list\030\001 \003(\0132*.hbase." +
+      "pb.SQLPredicatePushDownColumnFilterBB\n*o",
+      "rg.apache.hadoop.hbase.protobuf.generate" +
+      "dB\014FilterProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17701,6 +19508,18 @@ public final class FilterProtos {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_MultiRowRangeFilter_descriptor,
               new java.lang.String[] { "RowRangeList", });
+          internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor =
+            getDescriptor().getMessageTypes().get(30);
+          internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_hbase_pb_SQLPredicatePushDownColumnFilter_descriptor,
+              new java.lang.String[] { "ColumnFamily", "Qualifier", "GetPointList", "RangeList", });
+          internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor =
+            getDescriptor().getMessageTypes().get(31);
+          internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor,
+              new java.lang.String[] { "ColumnFilterList", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-protocol/src/main/protobuf/Filter.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Filter.proto b/hbase-protocol/src/main/protobuf/Filter.proto
index 67d5717..b2dc7d0 100644
--- a/hbase-protocol/src/main/protobuf/Filter.proto
+++ b/hbase-protocol/src/main/protobuf/Filter.proto
@@ -167,4 +167,17 @@ message RowRange {
 
 message MultiRowRangeFilter {
   repeated RowRange row_range_list = 1;
+}
+
+
+message SQLPredicatePushDownColumnFilter {
+  required bytes column_family = 1;
+  required bytes qualifier = 2;
+  repeated bytes get_point_list = 3;
+  repeated RowRange range_list = 4;
+
+}
+
+message SQLPredicatePushDownFilter {
+  repeated SQLPredicatePushDownColumnFilter column_filter_list = 1;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/e95358a7/hbase-spark/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml
index e48f9e8..8110629 100644
--- a/hbase-spark/pom.xml
+++ b/hbase-spark/pom.xml
@@ -79,6 +79,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
         </dependency>
@@ -319,6 +325,12 @@
 
         <dependency>
             <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-protocol</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-hadoop-compat</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>