You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2016/03/09 19:43:45 UTC

hbase git commit: HBASE-14801 Enhance the Spark-HBase connector catalog with json format (Zhan Zhang)

Repository: hbase
Updated Branches:
  refs/heads/master ad9b91a90 -> 97cce850f


HBASE-14801 Enhance the Spark-HBase connector catalog with json format (Zhan Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97cce850
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97cce850
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97cce850

Branch: refs/heads/master
Commit: 97cce850fed130aa263d61f6a3c4f361f2629c7c
Parents: ad9b91a
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Wed Mar 9 10:41:56 2016 -0800
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Wed Mar 9 10:41:56 2016 -0800

----------------------------------------------------------------------
 .../hbase/spark/SparkSQLPushDownFilter.java     |  18 +-
 .../hadoop/hbase/spark/DefaultSource.scala      | 370 +++++--------------
 .../spark/datasources/HBaseSparkConf.scala      |   5 +
 .../spark/datasources/HBaseTableScanRDD.scala   |  15 +-
 .../hadoop/hbase/spark/datasources/SerDes.scala |  46 +++
 .../hbase/DataTypeParserWrapper.scala           |  30 ++
 .../datasources/hbase/HBaseTableCatalog.scala   | 339 +++++++++++++++++
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 153 +++++---
 .../hadoop/hbase/spark/HBaseCatalogSuite.scala  | 111 ++++++
 9 files changed, 752 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
index c3fd25c..057853f 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.spark.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.sql.datasources.hbase.Field;
 import scala.collection.mutable.MutableList;
 
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -66,7 +68,7 @@ public class SparkSQLPushDownFilter extends FilterBase{
 
   public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
                                 byte[][] valueFromQueryArray,
-                                MutableList<SchemaQualifierDefinition> columnDefinitions) {
+                                MutableList<Field> fields) {
     this.dynamicLogicExpression = dynamicLogicExpression;
     this.valueFromQueryArray = valueFromQueryArray;
 
@@ -74,12 +76,12 @@ public class SparkSQLPushDownFilter extends FilterBase{
     this.currentCellToColumnIndexMap =
             new HashMap<>();
 
-    for (int i = 0; i < columnDefinitions.size(); i++) {
-      SchemaQualifierDefinition definition = columnDefinitions.get(i).get();
+    for (int i = 0; i < fields.size(); i++) {
+      Field field = fields.apply(i);
 
+      byte[] cfBytes = field.cfBytes();
       ByteArrayComparable familyByteComparable =
-              new ByteArrayComparable(definition.columnFamilyBytes(),
-                      0, definition.columnFamilyBytes().length);
+          new ByteArrayComparable(cfBytes, 0, cfBytes.length);
 
       HashMap<ByteArrayComparable, String> qualifierIndexMap =
               currentCellToColumnIndexMap.get(familyByteComparable);
@@ -88,11 +90,11 @@ public class SparkSQLPushDownFilter extends FilterBase{
         qualifierIndexMap = new HashMap<>();
         currentCellToColumnIndexMap.put(familyByteComparable, qualifierIndexMap);
       }
+      byte[] qBytes = field.colBytes();
       ByteArrayComparable qualifierByteComparable =
-              new ByteArrayComparable(definition.qualifierBytes(), 0,
-                      definition.qualifierBytes().length);
+          new ByteArrayComparable(qBytes, 0, qBytes.length);
 
-      qualifierIndexMap.put(qualifierByteComparable, definition.columnName());
+      qualifierIndexMap.put(qualifierByteComparable, field.colName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 844b5b5..97a8e9e 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositione
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.datasources.hbase.{Field, HBaseTableCatalog}
+import org.apache.spark.sql.types.{DataType => SparkDataType}
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -48,13 +49,6 @@ import scala.collection.mutable
  *   Through the HBase Bytes object commands.
  */
 class DefaultSource extends RelationProvider with Logging {
-
-  val TABLE_KEY:String = "hbase.table"
-  val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
-  val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
-  val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
-  val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
-
   /**
    * Is given input from SparkSQL to construct a BaseRelation
    * @param sqlContext SparkSQL context
@@ -64,87 +58,26 @@ class DefaultSource extends RelationProvider with Logging {
   override def createRelation(sqlContext: SQLContext,
                               parameters: Map[String, String]):
   BaseRelation = {
-
-
-    val tableName = parameters.get(TABLE_KEY)
-    if (tableName.isEmpty)
-      new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'")
-
-    val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
-    val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
-    val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
-    val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
-
-    new HBaseRelation(tableName.get,
-      generateSchemaMappingMap(schemaMappingString),
-      hbaseConfigResources,
-      useHBaseReources.equalsIgnoreCase("true"),
-      usePushDownColumnFilter.equalsIgnoreCase("true"),
-      parameters)(sqlContext)
-  }
-
-  /**
-   * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of
-   * SchemaQualifierDefinitions with the original sql column name as the key
-   * @param schemaMappingString The schema mapping string from the SparkSQL map
-   * @return                    A map of definitions keyed by the SparkSQL column name
-   */
-  def generateSchemaMappingMap(schemaMappingString:String):
-  java.util.HashMap[String, SchemaQualifierDefinition] = {
-    try {
-      val columnDefinitions = schemaMappingString.split(',')
-      val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]()
-      columnDefinitions.map(cd => {
-        val parts = cd.trim.split(' ')
-
-        //Make sure we get three parts
-        //<ColumnName> <ColumnType> <ColumnFamily:Qualifier>
-        if (parts.length == 3) {
-          val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') {
-            Array[String]("", "key")
-          } else {
-            parts(2).split(':')
-          }
-          resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0),
-            parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1)))
-        } else {
-          throw new IllegalArgumentException("Invalid value for schema mapping '" + cd +
-            "' should be '<columnName> <columnType> <columnFamily>:<qualifier>' " +
-            "for columns and '<columnName> <columnType> :<qualifier>' for rowKeys")
-        }
-      })
-      resultingMap
-    } catch {
-      case e:Exception => throw
-        new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY +
-          " '" + schemaMappingString + "'", e )
-    }
+    new HBaseRelation(parameters, None)(sqlContext)
   }
 }
 
 /**
  * Implementation of Spark BaseRelation that will build up our scan logic
  * , do the scan pruning, filter push down, and value conversions
- *
- * @param tableName               HBase table that we plan to read from
- * @param schemaMappingDefinition SchemaMapping information to map HBase
- *                                Qualifiers to SparkSQL columns
- * @param configResources         Optional comma separated list of config resources
- *                                to get based on their URI
- * @param useHBaseContext         If true this will look to see if
- *                                HBaseContext.latest is populated to use that
- *                                connection information
  * @param sqlContext              SparkSQL context
  */
-case class HBaseRelation (val tableName:String,
-                     val schemaMappingDefinition:
-                     java.util.HashMap[String, SchemaQualifierDefinition],
-                     val configResources:String,
-                     val useHBaseContext:Boolean,
-                     val usePushDownColumnFilter:Boolean,
-                     @transient parameters: Map[String, String] ) (
-  @transient val sqlContext:SQLContext)
+case class HBaseRelation (
+    @transient parameters: Map[String, String],
+    userSpecifiedSchema: Option[StructType]
+  )(@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan with Logging {
+  val catalog = HBaseTableCatalog(parameters)
+  def tableName = catalog.name
+  val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
+  val useHBaseContext =  parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true)
+  val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER)
+    .map(_.toBoolean).getOrElse(true)
 
   // The user supplied per table parameter will overwrite global ones in SparkConf
   val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
@@ -176,33 +109,12 @@ case class HBaseRelation (val tableName:String,
   def hbaseConf = wrappedConf.value
 
   /**
-   * Generates a Spark SQL schema object so Spark SQL knows what is being
+   * Generates a Spark SQL schema objeparametersct so Spark SQL knows what is being
    * provided by this BaseRelation
    *
    * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value
    */
-  override def schema: StructType = {
-
-    val metadataBuilder = new MetadataBuilder()
-
-    val structFieldArray = new Array[StructField](schemaMappingDefinition.size())
-
-    val schemaMappingDefinitionIt = schemaMappingDefinition.values().iterator()
-    var indexCounter = 0
-    while (schemaMappingDefinitionIt.hasNext) {
-      val c = schemaMappingDefinitionIt.next()
-
-      val metadata = metadataBuilder.putString("name", c.columnName).build()
-      val structField =
-        new StructField(c.columnName, c.columnSparkSqlType, nullable = true, metadata)
-
-      structFieldArray(indexCounter) = structField
-      indexCounter += 1
-    }
-
-    val result = new StructType(structFieldArray)
-    result
-  }
+  override val schema: StructType = userSpecifiedSchema.getOrElse(catalog.toDataType)
 
   /**
    * Here we are building the functionality to populate the resulting RDD[Row]
@@ -218,7 +130,6 @@ case class HBaseRelation (val tableName:String,
    */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
 
-
     val pushDownTuple = buildPushDownPredicatesResource(filters)
     val pushDownRowKeyFilter = pushDownTuple._1
     var pushDownDynamicLogicExpression = pushDownTuple._2
@@ -236,17 +147,13 @@ case class HBaseRelation (val tableName:String,
     logDebug("valueArray:                     " + valueArray.length)
 
     val requiredQualifierDefinitionList =
-      new mutable.MutableList[SchemaQualifierDefinition]
+      new mutable.MutableList[Field]
 
     requiredColumns.foreach( c => {
-      val definition = schemaMappingDefinition.get(c)
-      requiredQualifierDefinitionList += definition
+      val field = catalog.getField(c)
+      requiredQualifierDefinitionList += field
     })
 
-    //Create a local variable so that scala doesn't have to
-    // serialize the whole HBaseRelation Object
-    val serializableDefinitionMap = schemaMappingDefinition
-
     //retain the information for unit testing checks
     DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
       pushDownDynamicLogicExpression)
@@ -258,8 +165,8 @@ case class HBaseRelation (val tableName:String,
     pushDownRowKeyFilter.points.foreach(p => {
       val get = new Get(p)
       requiredQualifierDefinitionList.foreach( d => {
-        if (d.columnFamilyBytes.length > 0)
-          get.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+        if (d.isRowKey)
+          get.addColumn(d.cfBytes, d.colBytes)
       })
       getList.add(get)
     })
@@ -276,7 +183,7 @@ case class HBaseRelation (val tableName:String,
     var resultRDD: RDD[Row] = {
       val tmp = hRdd.map{ r =>
         Row.fromSeq(requiredColumns.map(c =>
-          DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r)))
+          DefaultSourceStaticUtils.getValue(catalog.getField(c), r)))
       }
       if (tmp.partitions.size > 0) {
         tmp
@@ -291,11 +198,10 @@ case class HBaseRelation (val tableName:String,
       scan.setBatch(batchNum)
       scan.setCaching(cacheSize)
       requiredQualifierDefinitionList.foreach( d =>
-        scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
+        scan.addColumn(d.cfBytes, d.colBytes))
 
       val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
-        Row.fromSeq(requiredColumns.map(c => DefaultSourceStaticUtils.getValue(c,
-          serializableDefinitionMap, r._2)))
+        Row.fromSeq(requiredColumns.map(c => DefaultSourceStaticUtils.getValue(catalog.getField(c), r._2)))
       })
       resultRDD=rdd
     }
@@ -337,74 +243,73 @@ case class HBaseRelation (val tableName:String,
     filter match {
 
       case EqualTo(attr, value) =>
-        val columnDefinition = schemaMappingDefinition.get(attr)
-        if (columnDefinition != null) {
-          if (columnDefinition.columnFamily.isEmpty) {
+        val field = catalog.getField(attr)
+        if (field != null) {
+          if (field.isRowKey) {
             parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
-              DefaultSourceStaticUtils.getByteValue(attr,
-                schemaMappingDefinition, value.toString), null))
+              DefaultSourceStaticUtils.getByteValue(field,
+                value.toString), null))
           }
           val byteValue =
-            DefaultSourceStaticUtils.getByteValue(attr,
-              schemaMappingDefinition, value.toString)
+            DefaultSourceStaticUtils.getByteValue(field, value.toString)
           valueArray += byteValue
         }
         new EqualLogicExpression(attr, valueArray.length - 1, false)
       case LessThan(attr, value) =>
-        val columnDefinition = schemaMappingDefinition.get(attr)
-        if (columnDefinition != null) {
-          if (columnDefinition.columnFamily.isEmpty) {
+        val field = catalog.getField(attr)
+        if (field != null) {
+          if (field.isRowKey) {
             parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
-                schemaMappingDefinition, value.toString), false,
+              new ScanRange(DefaultSourceStaticUtils.getByteValue(field,
+                value.toString), false,
                 new Array[Byte](0), true)))
           }
           val byteValue =
-            DefaultSourceStaticUtils.getByteValue(attr,
-              schemaMappingDefinition, value.toString)
+            DefaultSourceStaticUtils.getByteValue(catalog.getField(attr),
+              value.toString)
           valueArray += byteValue
         }
         new LessThanLogicExpression(attr, valueArray.length - 1)
       case GreaterThan(attr, value) =>
-        val columnDefinition = schemaMappingDefinition.get(attr)
-        if (columnDefinition != null) {
-          if (columnDefinition.columnFamily.isEmpty) {
+        val field = catalog.getField(attr)
+        if (field != null) {
+          if (field.isRowKey) {
             parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
-                schemaMappingDefinition, value.toString), false)))
+              new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(field,
+                value.toString), false)))
           }
           val byteValue =
-            DefaultSourceStaticUtils.getByteValue(attr,
-              schemaMappingDefinition, value.toString)
+            DefaultSourceStaticUtils.getByteValue(field,
+              value.toString)
           valueArray += byteValue
         }
         new GreaterThanLogicExpression(attr, valueArray.length - 1)
       case LessThanOrEqual(attr, value) =>
-        val columnDefinition = schemaMappingDefinition.get(attr)
-        if (columnDefinition != null) {
-          if (columnDefinition.columnFamily.isEmpty) {
+        val field = catalog.getField(attr)
+        if (field != null) {
+          if (field.isRowKey) {
             parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
-                schemaMappingDefinition, value.toString), true,
+              new ScanRange(DefaultSourceStaticUtils.getByteValue(field,
+                value.toString), true,
                 new Array[Byte](0), true)))
           }
           val byteValue =
-            DefaultSourceStaticUtils.getByteValue(attr,
-              schemaMappingDefinition, value.toString)
+            DefaultSourceStaticUtils.getByteValue(catalog.getField(attr),
+              value.toString)
           valueArray += byteValue
         }
         new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
       case GreaterThanOrEqual(attr, value) =>
-        val columnDefinition = schemaMappingDefinition.get(attr)
-        if (columnDefinition != null) {
-          if (columnDefinition.columnFamily.isEmpty) {
+        val field = catalog.getField(attr)
+        if (field != null) {
+          if (field.isRowKey) {
             parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
-                schemaMappingDefinition, value.toString), true)))
+              new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(field,
+                value.toString), true)))
           }
           val byteValue =
-            DefaultSourceStaticUtils.getByteValue(attr,
-              schemaMappingDefinition, value.toString)
+            DefaultSourceStaticUtils.getByteValue(catalog.getField(attr),
+              value.toString)
           valueArray += byteValue
 
         }
@@ -436,32 +341,6 @@ case class HBaseRelation (val tableName:String,
 }
 
 /**
- * Construct to contains column data that spend SparkSQL and HBase
- *
- * @param columnName   SparkSQL column name
- * @param colType      SparkSQL column type
- * @param columnFamily HBase column family
- * @param qualifier    HBase qualifier name
- */
-case class SchemaQualifierDefinition(columnName:String,
-                          colType:String,
-                          columnFamily:String,
-                          qualifier:String) extends Serializable {
-  val columnFamilyBytes = Bytes.toBytes(columnFamily)
-  val qualifierBytes = Bytes.toBytes(qualifier)
-  val columnSparkSqlType:DataType = if (colType.equals("BOOLEAN")) BooleanType
-    else if (colType.equals("TINYINT")) IntegerType
-    else if (colType.equals("INT")) IntegerType
-    else if (colType.equals("BIGINT")) LongType
-    else if (colType.equals("FLOAT")) FloatType
-    else if (colType.equals("DOUBLE")) DoubleType
-    else if (colType.equals("STRING")) StringType
-    else if (colType.equals("TIMESTAMP")) TimestampType
-    else if (colType.equals("DECIMAL")) StringType
-    else throw new IllegalArgumentException("Unsupported column type :" + colType)
-}
-
-/**
  * Construct to contain a single scan ranges information.  Also
  * provide functions to merge with other scan ranges through AND
  * or OR operators
@@ -788,35 +667,6 @@ class ColumnFilterCollection {
     })
   }
 
-  /**
-   * This will collect all the filter information in a way that is optimized
-   * for the HBase filter commend.  Allowing the filter to be accessed
-   * with columnFamily and qualifier information
-   *
-   * @param schemaDefinitionMap Schema Map that will help us map the right filters
-   *                            to the correct columns
-   * @return                    HashMap oc column filters
-   */
-  def generateFamilyQualifiterFilterMap(schemaDefinitionMap:
-                                        java.util.HashMap[String,
-                                          SchemaQualifierDefinition]):
-  util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter] = {
-    val familyQualifierFilterMap =
-      new util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter]()
-
-    columnFilterMap.foreach( e => {
-      val definition = schemaDefinitionMap.get(e._1)
-      //Don't add rowKeyFilter
-      if (definition.columnFamilyBytes.size > 0) {
-        familyQualifierFilterMap.put(
-          new ColumnFamilyQualifierMapKeyWrapper(
-            definition.columnFamilyBytes, 0, definition.columnFamilyBytes.length,
-            definition.qualifierBytes, 0, definition.qualifierBytes.length), e._2)
-      }
-    })
-    familyQualifierFilterMap
-  }
-
   override def toString:String = {
     val strBuilder = new StringBuilder
     columnFilterMap.foreach( e => strBuilder.append(e))
@@ -836,7 +686,7 @@ object DefaultSourceStaticUtils {
   val rawDouble = new RawDouble
   val rawString = RawString.ASCENDING
 
-  val byteRange = new ThreadLocal[PositionedByteRange]{
+  val byteRange = new ThreadLocal[PositionedByteRange] {
     override def initialValue(): PositionedByteRange = {
       val range = new SimplePositionedMutableByteRange()
       range.setOffset(0)
@@ -844,11 +694,11 @@ object DefaultSourceStaticUtils {
     }
   }
 
-  def getFreshByteRange(bytes:Array[Byte]): PositionedByteRange = {
+  def getFreshByteRange(bytes: Array[Byte]): PositionedByteRange = {
     getFreshByteRange(bytes, 0, bytes.length)
   }
 
-  def getFreshByteRange(bytes:Array[Byte],  offset:Int = 0, length:Int):
+  def getFreshByteRange(bytes: Array[Byte], offset: Int = 0, length: Int):
   PositionedByteRange = {
     byteRange.get().set(bytes).setLength(length).setOffset(offset)
   }
@@ -867,7 +717,7 @@ object DefaultSourceStaticUtils {
    * @param dynamicLogicExpression The dynamicLogicExpression used in the last query
    */
   def populateLatestExecutionRules(rowKeyFilter: RowKeyFilter,
-                                   dynamicLogicExpression: DynamicLogicExpression):Unit = {
+                                   dynamicLogicExpression: DynamicLogicExpression): Unit = {
     lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
       rowKeyFilter, dynamicLogicExpression))
     while (lastFiveExecutionRules.size() > 5) {
@@ -879,25 +729,16 @@ object DefaultSourceStaticUtils {
    * This method will convert the result content from HBase into the
    * SQL value type that is requested by the Spark SQL schema definition
    *
-   * @param columnName              The name of the SparkSQL Column
-   * @param schemaMappingDefinition The schema definition map
+   * @param field              The structure of the SparkSQL Column
    * @param r                       The result object from HBase
    * @return                        The converted object type
    */
-  def getValue(columnName: String,
-               schemaMappingDefinition:
-               java.util.HashMap[String, SchemaQualifierDefinition],
-               r: Result): Any = {
-
-    val columnDef = schemaMappingDefinition.get(columnName)
-
-    if (columnDef == null) throw new IllegalArgumentException("Unknown column:" + columnName)
-
-
-    if (columnDef.columnFamilyBytes.isEmpty) {
+  def getValue(field: Field,
+      r: Result): Any = {
+    if (field.isRowKey) {
       val row = r.getRow
 
-      columnDef.columnSparkSqlType match {
+      field.dt match {
         case IntegerType => rawInteger.decode(getFreshByteRange(row))
         case LongType => rawLong.decode(getFreshByteRange(row))
         case FloatType => rawFloat.decode(getFreshByteRange(row))
@@ -908,9 +749,9 @@ object DefaultSourceStaticUtils {
       }
     } else {
       val cellByteValue =
-        r.getColumnLatestCell(columnDef.columnFamilyBytes, columnDef.qualifierBytes)
+        r.getColumnLatestCell(field.cfBytes, field.colBytes)
       if (cellByteValue == null) null
-      else columnDef.columnSparkSqlType match {
+      else field.dt match {
         case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
           cellByteValue.getValueOffset, cellByteValue.getValueLength))
         case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
@@ -933,52 +774,41 @@ object DefaultSourceStaticUtils {
    * This will convert the value from SparkSQL to be stored into HBase using the
    * right byte Type
    *
-   * @param columnName              SparkSQL column name
-   * @param schemaMappingDefinition Schema definition map
    * @param value                   String value from SparkSQL
    * @return                        Returns the byte array to go into HBase
    */
-  def getByteValue(columnName: String,
-                   schemaMappingDefinition:
-                   java.util.HashMap[String, SchemaQualifierDefinition],
-                   value: String): Array[Byte] = {
-
-    val columnDef = schemaMappingDefinition.get(columnName)
-
-    if (columnDef == null) {
-      throw new IllegalArgumentException("Unknown column:" + columnName)
-    } else {
-      columnDef.columnSparkSqlType match {
-        case IntegerType =>
-          val result = new Array[Byte](Bytes.SIZEOF_INT)
-          val localDataRange = getFreshByteRange(result)
-          rawInteger.encode(localDataRange, value.toInt)
-          localDataRange.getBytes
-        case LongType =>
-          val result = new Array[Byte](Bytes.SIZEOF_LONG)
-          val localDataRange = getFreshByteRange(result)
-          rawLong.encode(localDataRange, value.toLong)
-          localDataRange.getBytes
-        case FloatType =>
-          val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
-          val localDataRange = getFreshByteRange(result)
-          rawFloat.encode(localDataRange, value.toFloat)
-          localDataRange.getBytes
-        case DoubleType =>
-          val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
-          val localDataRange = getFreshByteRange(result)
-          rawDouble.encode(localDataRange, value.toDouble)
-          localDataRange.getBytes
-        case StringType =>
-          Bytes.toBytes(value)
-        case TimestampType =>
-          val result = new Array[Byte](Bytes.SIZEOF_LONG)
-          val localDataRange = getFreshByteRange(result)
-          rawLong.encode(localDataRange, value.toLong)
-          localDataRange.getBytes
-
-        case _ => Bytes.toBytes(value)
-      }
+  def getByteValue(field: Field,
+      value: String): Array[Byte] = {
+    field.dt match {
+      case IntegerType =>
+        val result = new Array[Byte](Bytes.SIZEOF_INT)
+        val localDataRange = getFreshByteRange(result)
+        rawInteger.encode(localDataRange, value.toInt)
+        localDataRange.getBytes
+      case LongType =>
+        val result = new Array[Byte](Bytes.SIZEOF_LONG)
+        val localDataRange = getFreshByteRange(result)
+        rawLong.encode(localDataRange, value.toLong)
+        localDataRange.getBytes
+      case FloatType =>
+        val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
+        val localDataRange = getFreshByteRange(result)
+        rawFloat.encode(localDataRange, value.toFloat)
+        localDataRange.getBytes
+      case DoubleType =>
+        val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
+        val localDataRange = getFreshByteRange(result)
+        rawDouble.encode(localDataRange, value.toDouble)
+        localDataRange.getBytes
+      case StringType =>
+        Bytes.toBytes(value)
+      case TimestampType =>
+        val result = new Array[Byte](Bytes.SIZEOF_LONG)
+        val localDataRange = getFreshByteRange(result)
+        rawLong.encode(localDataRange, value.toLong)
+        localDataRange.getBytes
+
+      case _ => Bytes.toBytes(value)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 5e11356..ca44d42 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -31,4 +31,9 @@ object HBaseSparkConf{
   val defaultBatchNum = 1000
   val BULKGET_SIZE = "spark.hbase.bulkGetSize"
   val defaultBulkGetSize = 1000
+
+  val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources"
+  val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
+  val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
+  val defaultPushDownColumnFilter = true
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index d859957..2e05651 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.spark._
 import org.apache.hadoop.hbase.spark.hbase._
 import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
+import org.apache.spark.sql.datasources.hbase.Field
 import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition}
 import org.apache.spark.rdd.RDD
 
@@ -31,7 +32,7 @@ import scala.collection.mutable
 class HBaseTableScanRDD(relation: HBaseRelation,
                        val hbaseContext: HBaseContext,
                        @transient val filter: Option[SparkSQLPushDownFilter] = None,
-                       val columns: Seq[SchemaQualifierDefinition] = Seq.empty
+                        val columns: Seq[Field] = Seq.empty
      )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging  {
   private def sparkConf = SparkEnv.get.conf
   @transient var ranges = Seq.empty[Range]
@@ -98,15 +99,15 @@ class HBaseTableScanRDD(relation: HBaseRelation,
       tbr: TableResource,
       g: Seq[Array[Byte]],
       filter: Option[SparkSQLPushDownFilter],
-      columns: Seq[SchemaQualifierDefinition],
+      columns: Seq[Field],
       hbaseContext: HBaseContext): Iterator[Result] = {
     g.grouped(relation.bulkGetSize).flatMap{ x =>
       val gets = new ArrayList[Get]()
       x.foreach{ y =>
         val g = new Get(y)
         columns.foreach { d =>
-          if (d.columnFamilyBytes.length > 0) {
-            g.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+          if (!d.isRowKey) {
+            g.addColumn(d.cfBytes, d.colBytes)
           }
         }
         filter.foreach(g.setFilter(_))
@@ -149,7 +150,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
 
   private def buildScan(range: Range,
       filter: Option[SparkSQLPushDownFilter],
-      columns: Seq[SchemaQualifierDefinition]): Scan = {
+      columns: Seq[Field]): Scan = {
     val scan = (range.lower, range.upper) match {
       case (Some(Bound(a, b)), Some(Bound(c, d))) => new Scan(a, c)
       case (None, Some(Bound(c, d))) => new Scan(Array[Byte](), c)
@@ -158,8 +159,8 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     }
 
     columns.foreach { d =>
-      if (d.columnFamilyBytes.length > 0) {
-        scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+      if (!d.isRowKey) {
+        scan.addColumn(d.cfBytes, d.colBytes)
       }
     }
     scan.setCacheBlocks(relation.blockCacheEnable)

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala
new file mode 100644
index 0000000..a19099b
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import java.io.ByteArrayInputStream
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.GenericDatumReader
+import org.apache.avro.generic.GenericDatumWriter
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.output.ByteArrayOutputStream
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.types._
+
+trait SerDes {
+  def serialize(value: Any): Array[Byte]
+  def deserialize(bytes: Array[Byte], start: Int, end: Int): Any
+}
+
+class DoubleSerDes extends SerDes {
+  override def serialize(value: Any): Array[Byte] = Bytes.toBytes(value.asInstanceOf[Double])
+  override def deserialize(bytes: Array[Byte], start: Int, end: Int): Any = {
+    Bytes.toDouble(bytes, start)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala
new file mode 100644
index 0000000..1e56a3d
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.datasources.hbase
+
+import org.apache.spark.sql.catalyst.SqlLexical
+import org.apache.spark.sql.catalyst.util.DataTypeParser
+import org.apache.spark.sql.types.DataType
+
+object DataTypeParserWrapper {
+  lazy val dataTypeParser = new DataTypeParser {
+    override val lexical = new SqlLexical
+  }
+
+  def parse(dataTypeString: String): DataType = dataTypeParser.toDataType(dataTypeString)
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
new file mode 100644
index 0000000..103fb90
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.datasources.hbase
+
+import org.apache.hadoop.hbase.spark.datasources._
+import org.apache.hadoop.hbase.spark.hbase._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.util.DataTypeParser
+import org.apache.spark.sql.types._
+import org.json4s.jackson.JsonMethods._
+
+import scala.collection.mutable
+
+// Due the access issue defined in spark, we have to locate the file in this package.
+// The definition of each column cell, which may be composite type
+// TODO: add avro support
+case class Field(
+    colName: String,
+    cf: String,
+    col: String,
+    sType: Option[String] = None,
+    avroSchema: Option[String] = None,
+    serdes: Option[SerDes]= None,
+    len: Int = -1) extends Logging {
+  override def toString = s"$colName $cf $col"
+  val isRowKey = cf == HBaseTableCatalog.rowKey
+  var start: Int = _
+
+  def cfBytes: Array[Byte] = {
+    if (isRowKey) {
+      Bytes.toBytes("")
+    } else {
+      Bytes.toBytes(cf)
+    }
+  }
+  def colBytes: Array[Byte] = {
+    if (isRowKey) {
+      Bytes.toBytes("key")
+    } else {
+      Bytes.toBytes(col)
+    }
+  }
+
+  val dt = {
+    sType.map(DataTypeParser.parse(_)).get
+  }
+
+  var length: Int = {
+    if (len == -1) {
+      dt match {
+        case BinaryType | StringType => -1
+        case BooleanType => Bytes.SIZEOF_BOOLEAN
+        case ByteType => 1
+        case DoubleType => Bytes.SIZEOF_DOUBLE
+        case FloatType => Bytes.SIZEOF_FLOAT
+        case IntegerType => Bytes.SIZEOF_INT
+        case LongType => Bytes.SIZEOF_LONG
+        case ShortType => Bytes.SIZEOF_SHORT
+        case _ => -1
+      }
+    } else {
+      len
+    }
+
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case that: Field =>
+      colName == that.colName && cf == that.cf && col == that.col
+    case _ => false
+  }
+}
+
+// The row key definition, with each key refer to the col defined in Field, e.g.,
+// key1:key2:key3
+case class RowKey(k: String) {
+  val keys = k.split(":")
+  var fields: Seq[Field] = _
+  var varLength = false
+  def length = {
+    if (varLength) {
+      -1
+    } else {
+      fields.foldLeft(0){case (x, y) =>
+        x + y.length
+      }
+    }
+  }
+}
+// The map between the column presented to Spark and the HBase field
+case class SchemaMap(map: mutable.HashMap[String, Field]) {
+  def toFields = map.map { case (name, field) =>
+    StructField(name, field.dt)
+  }.toSeq
+
+  def fields = map.values
+
+  def getField(name: String) = map(name)
+}
+
+
+// The definition of HBase and Relation relation schema
+case class HBaseTableCatalog(
+     namespace: String,
+     name: String,
+     row: RowKey,
+     sMap: SchemaMap,
+     numReg: Int) extends Logging {
+  def toDataType = StructType(sMap.toFields)
+  def getField(name: String) = sMap.getField(name)
+  def getRowKey: Seq[Field] = row.fields
+  def getPrimaryKey= row.keys(0)
+  def getColumnFamilies = {
+    sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey)
+  }
+
+  // Setup the start and length for each dimension of row key at runtime.
+  def dynSetupRowKey(rowKey: HBaseType) {
+    logDebug(s"length: ${rowKey.length}")
+    if(row.varLength) {
+      var start = 0
+      row.fields.foreach { f =>
+        logDebug(s"start: $start")
+        f.start = start
+        f.length = {
+          // If the length is not defined
+          if (f.length == -1) {
+            f.dt match {
+              case StringType =>
+                var pos = rowKey.indexOf(HBaseTableCatalog.delimiter, start)
+                if (pos == -1 || pos > rowKey.length) {
+                  // this is at the last dimension
+                  pos = rowKey.length
+                }
+                pos - start
+              // We don't know the length, assume it extend to the end of the rowkey.
+              case _ => rowKey.length - start
+            }
+          } else {
+            f.length
+          }
+        }
+        start += f.length
+      }
+    }
+  }
+
+  def initRowKey = {
+    val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.rowKey)
+    row.fields = row.keys.flatMap(n => fields.find(_.col == n))
+    // The length is determined at run time if it is string or binary and the length is undefined.
+    if (row.fields.filter(_.length == -1).isEmpty) {
+      var start = 0
+      row.fields.foreach { f =>
+        f.start = start
+        start += f.length
+      }
+    } else {
+      row.varLength = true
+    }
+  }
+  initRowKey
+}
+
+object HBaseTableCatalog {
+  val newTable = "newtable"
+  // The json string specifying hbase catalog information
+  val tableCatalog = "catalog"
+  // The row key with format key1:key2 specifying table row key
+  val rowKey = "rowkey"
+  // The key for hbase table whose value specify namespace and table name
+  val table = "table"
+  // The namespace of hbase table
+  val nameSpace = "namespace"
+  // The name of hbase table
+  val tableName = "name"
+  // The name of columns in hbase catalog
+  val columns = "columns"
+  val cf = "cf"
+  val col = "col"
+  val `type` = "type"
+  // the name of avro schema json string
+  val avro = "avro"
+  val delimiter: Byte = 0
+  val serdes = "serdes"
+  val length = "length"
+
+  /**
+    * User provide table schema definition
+    * {"tablename":"name", "rowkey":"key1:key2",
+    * "columns":{"col1":{"cf":"cf1", "col":"col1", "type":"type1"},
+    * "col2":{"cf":"cf2", "col":"col2", "type":"type2"}}}
+    * Note that any col in the rowKey, there has to be one corresponding col defined in columns
+    */
+  def apply(params: Map[String, String]): HBaseTableCatalog = {
+    val parameters = convert(params)
+    //  println(jString)
+    val jString = parameters(tableCatalog)
+    val map = parse(jString).values.asInstanceOf[Map[String, _]]
+    val tableMeta = map.get(table).get.asInstanceOf[Map[String, _]]
+    val nSpace = tableMeta.get(nameSpace).getOrElse("default").asInstanceOf[String]
+    val tName = tableMeta.get(tableName).get.asInstanceOf[String]
+    val cIter = map.get(columns).get.asInstanceOf[Map[String, Map[String, String]]].toIterator
+    val schemaMap = mutable.HashMap.empty[String, Field]
+    cIter.foreach { case (name, column) =>
+      val sd = {
+        column.get(serdes).asInstanceOf[Option[String]].map(n =>
+          Class.forName(n).newInstance().asInstanceOf[SerDes]
+        )
+      }
+      val len = column.get(length).map(_.toInt).getOrElse(-1)
+      val sAvro = column.get(avro).map(parameters(_))
+      val f = Field(name, column.getOrElse(cf, rowKey),
+        column.get(col).get,
+        column.get(`type`),
+        sAvro, sd, len)
+      schemaMap.+=((name, f))
+    }
+    val numReg = parameters.get(newTable).map(x => x.toInt).getOrElse(0)
+    val rKey = RowKey(map.get(rowKey).get.asInstanceOf[String])
+    HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), numReg)
+  }
+
+  val TABLE_KEY: String = "hbase.table"
+  val SCHEMA_COLUMNS_MAPPING_KEY: String = "hbase.columns.mapping"
+
+  /* for backward compatibility. Convert the old definition to new json based definition formated as below
+    val catalog = s"""{
+                      |"table":{"namespace":"default", "name":"htable"},
+                      |"rowkey":"key1:key2",
+                      |"columns":{
+                      |"col1":{"cf":"rowkey", "col":"key1", "type":"string"},
+                      |"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
+                      |"col3":{"cf":"cf1", "col":"col2", "type":"binary"},
+                      |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"},
+                      |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"},
+                      |"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
+                      |"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
+                      |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
+                      |}
+                      |}""".stripMargin
+   */
+  @deprecated("Please use new json format to define HBaseCatalog")
+  def convert(parameters: Map[String, String]): Map[String, String] = {
+    val tableName = parameters.get(TABLE_KEY).getOrElse(null)
+    // if the hbase.table is not defined, we assume it is json format already.
+    if (tableName == null) return parameters
+    val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
+    import scala.collection.JavaConverters._
+    val schemaMap = generateSchemaMappingMap(schemaMappingString).asScala.map(_._2.asInstanceOf[SchemaQualifierDefinition])
+
+    val rowkey = schemaMap.filter {
+      _.columnFamily == "rowkey"
+    }.map(_.columnName)
+    val cols = schemaMap.map { x =>
+      s""""${x.columnName}":{"cf":"${x.columnFamily}", "col":"${x.qualifier}", "type":"${x.colType}"}""".stripMargin
+    }
+    val jsonCatalog =
+      s"""{
+         |"table":{"namespace":"default", "name":"${tableName}"},
+         |"rowkey":"${rowkey.mkString(":")}",
+         |"columns":{
+         |${cols.mkString(",")}
+         |}
+         |}
+       """.stripMargin
+    parameters ++ Map(HBaseTableCatalog.tableCatalog->jsonCatalog)
+  }
+
+  /**
+    * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of
+    * SchemaQualifierDefinitions with the original sql column name as the key
+    *
+    * @param schemaMappingString The schema mapping string from the SparkSQL map
+    * @return                    A map of definitions keyed by the SparkSQL column name
+    */
+  def generateSchemaMappingMap(schemaMappingString:String):
+  java.util.HashMap[String, SchemaQualifierDefinition] = {
+    println(schemaMappingString)
+    try {
+      val columnDefinitions = schemaMappingString.split(',')
+      val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]()
+      columnDefinitions.map(cd => {
+        val parts = cd.trim.split(' ')
+
+        //Make sure we get three parts
+        //<ColumnName> <ColumnType> <ColumnFamily:Qualifier>
+        if (parts.length == 3) {
+          val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') {
+            Array[String]("rowkey", parts(0))
+          } else {
+            parts(2).split(':')
+          }
+          resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0),
+            parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1)))
+        } else {
+          throw new IllegalArgumentException("Invalid value for schema mapping '" + cd +
+            "' should be '<columnName> <columnType> <columnFamily>:<qualifier>' " +
+            "for columns and '<columnName> <columnType> :<qualifier>' for rowKeys")
+        }
+      })
+      resultingMap
+    } catch {
+      case e:Exception => throw
+        new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY +
+          " '" +
+          schemaMappingString + "'", e )
+    }
+  }
+}
+
+/**
+  * Construct to contains column data that spend SparkSQL and HBase
+  *
+  * @param columnName   SparkSQL column name
+  * @param colType      SparkSQL column type
+  * @param columnFamily HBase column family
+  * @param qualifier    HBase qualifier name
+  */
+case class SchemaQualifierDefinition(columnName:String,
+    colType:String,
+    columnFamily:String,
+    qualifier:String)

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 04dd9ba..2987ec6 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
 import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility}
+import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
@@ -137,20 +138,37 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
       connection.close()
     }
 
+    def hbaseTable1Catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
+            |}
+          |}""".stripMargin
+
     new HBaseContext(sc, TEST_UTIL.getConfiguration)
     sqlContext = new SQLContext(sc)
 
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
-        "hbase.table" -> "t1"))
+      Map(HBaseTableCatalog.tableCatalog->hbaseTable1Catalog))
 
     df.registerTempTable("hbaseTable1")
 
+    def hbaseTable2Catalog = s"""{
+            |"table":{"namespace":"default", "name":"t2"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"int"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
+            |}
+          |}""".stripMargin
+
+
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
-        "hbase.table" -> "t2"))
+      Map(HBaseTableCatalog.tableCatalog->hbaseTable2Catalog))
 
     df.registerTempTable("hbaseTable2")
   }
@@ -512,13 +530,20 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(scanRange1.isUpperBoundEqualTo)
   }
 
-
   test("Test table that doesn't exist") {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1NotThere"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"c", "type":"string"}
+            |}
+          |}""".stripMargin
+
     intercept[Exception] {
       df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map("hbase.columns.mapping" ->
-          "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
-          "hbase.table" -> "t1NotThere"))
+        Map(HBaseTableCatalog.tableCatalog->catalog))
 
       df.registerTempTable("hbaseNonExistingTmp")
 
@@ -530,11 +555,20 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
   }
 
+
   test("Test table with column that doesn't exist") {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
+              |"C_FIELD":{"cf":"c", "col":"c", "type":"string"}
+            |}
+          |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, C_FIELD STRING c:c,",
-        "hbase.table" -> "t1"))
+      Map(HBaseTableCatalog.tableCatalog->catalog))
 
     df.registerTempTable("hbaseFactColumnTmp")
 
@@ -549,10 +583,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
   }
 
   test("Test table with INT column") {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
+              |"I_FIELD":{"cf":"c", "col":"i", "type":"int"}
+            |}
+          |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD INT c:i,",
-        "hbase.table" -> "t1"))
+      Map(HBaseTableCatalog.tableCatalog->catalog))
 
     df.registerTempTable("hbaseIntTmp")
 
@@ -571,10 +613,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
   }
 
   test("Test table with INT column defined at wrong type") {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
+              |"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
+            |}
+          |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
-        "hbase.table" -> "t1"))
+      Map(HBaseTableCatalog.tableCatalog->catalog))
 
     df.registerTempTable("hbaseIntWrongTypeTmp")
 
@@ -594,32 +644,19 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(localResult(0).getString(2).charAt(3).toByte == 1)
   }
 
-  test("Test improperly formatted column mapping") {
-    intercept[IllegalArgumentException] {
-      df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map("hbase.columns.mapping" ->
-          "KEY_FIELD,STRING,:key, A_FIELD,STRING,c:a, B_FIELD,STRING,c:b, I_FIELD,STRING,c:i,",
-          "hbase.table" -> "t1"))
-
-      df.registerTempTable("hbaseBadTmp")
-
-      val result = sqlContext.sql("SELECT KEY_FIELD, " +
-        "B_FIELD, I_FIELD FROM hbaseBadTmp")
-
-      val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
-      assert(executionRules.dynamicLogicExpression == null)
-
-      result.take(5)
-    }
-  }
-
-
   test("Test bad column type") {
-    intercept[IllegalArgumentException] {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"FOOBAR"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
+            |}
+          |}""".stripMargin
+    intercept[Exception] {
       df = sqlContext.load("org.apache.hadoop.hbase.spark",
-        Map("hbase.columns.mapping" ->
-          "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
-          "hbase.table" -> "t1"))
+        Map(HBaseTableCatalog.tableCatalog->catalog))
 
       df.registerTempTable("hbaseIntWrongTypeTmp")
 
@@ -665,10 +702,18 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
   }
 
   test("Test table with sparse column") {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
+              |"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
+            |}
+          |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,",
-        "hbase.table" -> "t1"))
+      Map(HBaseTableCatalog.tableCatalog->catalog))
 
     df.registerTempTable("hbaseZTmp")
 
@@ -688,11 +733,19 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
   }
 
   test("Test with column logic disabled") {
+    val catalog = s"""{
+            |"table":{"namespace":"default", "name":"t1"},
+            |"rowkey":"key",
+            |"columns":{
+              |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
+              |"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
+              |"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
+              |"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
+            |}
+          |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
-      Map("hbase.columns.mapping" ->
-        "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b, Z_FIELD STRING c:z,",
-        "hbase.table" -> "t1",
-        "hbase.push.down.column.filter" -> "false"))
+      Map(HBaseTableCatalog.tableCatalog->catalog,
+        HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false"))
 
     df.registerTempTable("hbaseNoPushDownTmp")
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/97cce850/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
new file mode 100644
index 0000000..49e2f6c
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.spark.datasources.{DoubleSerDes, SerDes}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Logging
+import org.apache.spark.sql.datasources.hbase.{DataTypeParserWrapper, HBaseTableCatalog}
+import org.apache.spark.sql.types._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
+
+  val map = s"""MAP<int, struct<varchar:string>>"""
+  val array = s"""array<struct<tinYint:tinyint>>"""
+  val arrayMap = s"""MAp<int, ARRAY<double>>"""
+  val catalog = s"""{
+                    |"table":{"namespace":"default", "name":"htable"},
+                    |"rowkey":"key1:key2",
+                    |"columns":{
+                    |"col1":{"cf":"rowkey", "col":"key1", "type":"string"},
+                    |"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
+                    |"col3":{"cf":"cf1", "col":"col2", "type":"binary"},
+                    |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"},
+                    |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"},
+                    |"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
+                    |"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
+                    |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
+                    |}
+                    |}""".stripMargin
+  val parameters = Map(HBaseTableCatalog.tableCatalog->catalog)
+  val t = HBaseTableCatalog(parameters)
+
+  def checkDataType(dataTypeString: String, expectedDataType: DataType): Unit = {
+    test(s"parse ${dataTypeString.replace("\n", "")}") {
+      assert(DataTypeParserWrapper.parse(dataTypeString) === expectedDataType)
+    }
+  }
+  test("basic") {
+    assert(t.getField("col1").isRowKey == true)
+    assert(t.getPrimaryKey == "key1")
+    assert(t.getField("col3").dt == BinaryType)
+    assert(t.getField("col4").dt == TimestampType)
+    assert(t.getField("col5").dt == DoubleType)
+    assert(t.getField("col5").serdes != None)
+    assert(t.getField("col4").serdes == None)
+    assert(t.getField("col1").isRowKey)
+    assert(t.getField("col2").isRowKey)
+    assert(!t.getField("col3").isRowKey)
+    assert(t.getField("col2").length == Bytes.SIZEOF_DOUBLE)
+    assert(t.getField("col1").length == -1)
+    assert(t.getField("col8").length == -1)
+  }
+
+  checkDataType(
+    map,
+    t.getField("col6").dt
+  )
+
+  checkDataType(
+    array,
+    t.getField("col7").dt
+  )
+
+  checkDataType(
+    arrayMap,
+    t.getField("col8").dt
+  )
+
+  test("convert") {
+    val m = Map("hbase.columns.mapping" ->
+      "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD BINARY c:c,",
+      "hbase.table" -> "t1")
+    val map = HBaseTableCatalog.convert(m)
+    val json = map.get(HBaseTableCatalog.tableCatalog).get
+    val parameters = Map(HBaseTableCatalog.tableCatalog->json)
+    val t = HBaseTableCatalog(parameters)
+    assert(t.getField("KEY_FIELD").isRowKey)
+    assert(DataTypeParserWrapper.parse("STRING") === t.getField("A_FIELD").dt)
+    assert(!t.getField("A_FIELD").isRowKey)
+    assert(DataTypeParserWrapper.parse("DOUBLE") === t.getField("B_FIELD").dt)
+    assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
+  }
+
+  test("compatiblity") {
+    val m = Map("hbase.columns.mapping" ->
+      "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD BINARY c:c,",
+      "hbase.table" -> "t1")
+    val t = HBaseTableCatalog(m)
+    assert(t.getField("KEY_FIELD").isRowKey)
+    assert(DataTypeParserWrapper.parse("STRING") === t.getField("A_FIELD").dt)
+    assert(!t.getField("A_FIELD").isRowKey)
+    assert(DataTypeParserWrapper.parse("DOUBLE") === t.getField("B_FIELD").dt)
+    assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
+  }
+}