You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/07/28 18:47:04 UTC

[2/3] hbase git commit: HBASE-13992 Integrate SparkOnHBase into HBase

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
new file mode 100644
index 0000000..4839892
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.function.VoidFunction
+import org.apache.spark.api.java.function.Function
+import org.apache.hadoop.hbase.client.Connection
+import org.apache.spark.streaming.api.java.JavaDStream
+import org.apache.spark.api.java.function.FlatMapFunction
+import scala.collection.JavaConversions._
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import scala.reflect.ClassTag
+
+/**
+ * This is the Java Wrapper over HBaseContext which is written in
+ * Scala.  This class will be used by developers that want to
+ * work with Spark or Spark Streaming in Java
+ *
+ * @param jsc    This is the JavaSparkContext that we will wrap
+ * @param config This is the config information to out HBase cluster
+ */
+class JavaHBaseContext(@transient jsc: JavaSparkContext,
+  @transient config: Configuration) extends Serializable {
+    val hbaseContext = new HBaseContext(jsc.sc, config)
+
+    /**
+     * A simple enrichment of the traditional Spark javaRdd foreachPartition.
+     * This function differs from the original in that it offers the
+     * developer access to a already connected HConnection object
+     *
+     * Note: Do not close the HConnection object.  All HConnection
+     * management is handled outside this method
+     *
+     * @param javaRdd Original javaRdd with data to iterate over
+     * @param f       Function to be given a iterator to iterate through
+     *                the RDD values and a HConnection object to interact
+     *                with HBase
+     */
+    def foreachPartition[T](javaRdd: JavaRDD[T],
+                            f: VoidFunction[(java.util.Iterator[T], Connection)] ) = {
+
+      hbaseContext.foreachPartition(javaRdd.rdd,
+        (it:Iterator[T], conn:Connection) =>
+        { f.call((it, conn)) })
+    }
+
+    /**
+     * A simple enrichment of the traditional Spark Streaming dStream foreach
+     * This function differs from the original in that it offers the
+     * developer access to a already connected HConnection object
+     *
+     * Note: Do not close the HConnection object.  All HConnection
+     * management is handled outside this method
+     *
+     * @param javaDstream Original DStream with data to iterate over
+     * @param f           Function to be given a iterator to iterate through
+     *                    the JavaDStream values and a HConnection object to
+     *                    interact with HBase
+     */
+    def foreachPartition[T](javaDstream: JavaDStream[T],
+                      f: VoidFunction[(Iterator[T], Connection)]) = {
+      hbaseContext.foreachPartition(javaDstream.dstream,
+        (it:Iterator[T], conn: Connection) => f.call(it, conn))
+    }
+
+    /**
+     * A simple enrichment of the traditional Spark JavaRDD mapPartition.
+     * This function differs from the original in that it offers the
+     * developer access to a already connected HConnection object
+     *
+     * Note: Do not close the HConnection object.  All HConnection
+     * management is handled outside this method
+     *
+     * Note: Make sure to partition correctly to avoid memory issue when
+     *       getting data from HBase
+     *
+     * @param javaRdd Original JavaRdd with data to iterate over
+     * @param f       Function to be given a iterator to iterate through
+     *                the RDD values and a HConnection object to interact
+     *                with HBase
+     * @return        Returns a new RDD generated by the user definition
+     *                function just like normal mapPartition
+     */
+    def mapPartitions[T,R](javaRdd: JavaRDD[T],
+                          f: FlatMapFunction[(java.util.Iterator[T],
+                            Connection),R] ): JavaRDD[R] = {
+
+      def fn = (it: Iterator[T], conn: Connection) =>
+        asScalaIterator(
+          f.call((asJavaIterator(it), conn)).iterator()
+        )
+
+      JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
+        (iterator:Iterator[T], connection:Connection) =>
+          fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
+    }
+
+    /**
+     * A simple enrichment of the traditional Spark Streaming JavaDStream
+     * mapPartition.
+     *
+     * This function differs from the original in that it offers the
+     * developer access to a already connected HConnection object
+     *
+     * Note: Do not close the HConnection object.  All HConnection
+     * management is handled outside this method
+     *
+     * Note: Make sure to partition correctly to avoid memory issue when
+     *       getting data from HBase
+     *
+     * @param javaDstream Original JavaDStream with data to iterate over
+     * @param mp          Function to be given a iterator to iterate through
+     *                    the JavaDStream values and a HConnection object to
+     *                    interact with HBase
+     * @return            Returns a new JavaDStream generated by the user
+     *                    definition function just like normal mapPartition
+     */
+    def streamMap[T, U](javaDstream: JavaDStream[T],
+                        mp: Function[(Iterator[T], Connection), Iterator[U]]):
+    JavaDStream[U] = {
+      JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
+        (it: Iterator[T], conn: Connection) =>
+          mp.call(it, conn) )(fakeClassTag[U]))(fakeClassTag[U])
+    }
+
+    /**
+     * A simple abstraction over the HBaseContext.foreachPartition method.
+     *
+     * It allow addition support for a user to take JavaRDD
+     * and generate puts and send them to HBase.
+     * The complexity of managing the HConnection is
+     * removed from the developer
+     *
+     * @param javaDdd   Original JavaRDD with data to iterate over
+     * @param tableName The name of the table to put into
+     * @param f         Function to convert a value in the JavaRDD
+     *                  to a HBase Put
+     */
+    def bulkPut[T](javaDdd: JavaRDD[T],
+                   tableName: TableName,
+                   f: Function[(T), Put]) {
+
+      hbaseContext.bulkPut(javaDdd.rdd, tableName, (t:T) => f.call(t))
+    }
+
+    /**
+     * A simple abstraction over the HBaseContext.streamMapPartition method.
+     *
+     * It allow addition support for a user to take a JavaDStream and
+     * generate puts and send them to HBase.
+     *
+     * The complexity of managing the HConnection is
+     * removed from the developer
+     *
+     * @param javaDstream Original DStream with data to iterate over
+     * @param tableName   The name of the table to put into
+     * @param f           Function to convert a value in
+     *                    the JavaDStream to a HBase Put
+     */
+    def streamBulkPut[T](javaDstream: JavaDStream[T],
+                         tableName: TableName,
+                         f: Function[T,Put]) = {
+      hbaseContext.streamBulkPut(javaDstream.dstream,
+        tableName,
+        (t:T) => f.call(t))
+    }
+
+    /**
+     * A simple abstraction over the HBaseContext.foreachPartition method.
+     *
+     * It allow addition support for a user to take a JavaRDD and
+     * generate delete and send them to HBase.
+     *
+     * The complexity of managing the HConnection is
+     * removed from the developer
+     *
+     * @param javaRdd   Original JavaRDD with data to iterate over
+     * @param tableName The name of the table to delete from
+     * @param f         Function to convert a value in the JavaRDD to a
+     *                  HBase Deletes
+     * @param batchSize The number of deletes to batch before sending to HBase
+     */
+    def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
+                      f: Function[T, Delete], batchSize:Integer) {
+      hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t:T) => f.call(t), batchSize)
+    }
+
+    /**
+     * A simple abstraction over the HBaseContext.streamBulkMutation method.
+     *
+     * It allow addition support for a user to take a JavaDStream and
+     * generate Delete and send them to HBase.
+     *
+     * The complexity of managing the HConnection is
+     * removed from the developer
+     *
+     * @param javaDstream Original DStream with data to iterate over
+     * @param tableName   The name of the table to delete from
+     * @param f           Function to convert a value in the JavaDStream to a
+     *                    HBase Delete
+     * @param batchSize   The number of deletes to be sent at once
+     */
+    def streamBulkDelete[T](javaDstream: JavaDStream[T],
+                            tableName: TableName,
+                            f: Function[T, Delete],
+                            batchSize: Integer) = {
+      hbaseContext.streamBulkDelete(javaDstream.dstream, tableName,
+        (t:T) => f.call(t),
+        batchSize)
+    }
+
+    /**
+     * A simple abstraction over the HBaseContext.mapPartition method.
+     *
+     * It allow addition support for a user to take a JavaRDD and generates a
+     * new RDD based on Gets and the results they bring back from HBase
+     *
+     * @param tableName     The name of the table to get from
+     * @param batchSize     batch size of how many gets to retrieve in a single fetch
+     * @param javaRdd       Original JavaRDD with data to iterate over
+     * @param makeGet   Function to convert a value in the JavaRDD to a
+     *                  HBase Get
+     * @param convertResult This will convert the HBase Result object to
+     *                      what ever the user wants to put in the resulting
+     *                      JavaRDD
+     * return           new JavaRDD that is created by the Get to HBase
+     */
+    def bulkGet[T, U](tableName: TableName,
+                      batchSize:Integer,
+                      javaRdd: JavaRDD[T],
+                      makeGet: Function[T, Get],
+                      convertResult: Function[Result, U]): JavaRDD[U] = {
+
+      JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
+        batchSize,
+        javaRdd.rdd,
+        (t:T) => makeGet.call(t),
+        (r:Result) => {convertResult.call(r)})(fakeClassTag[U]))(fakeClassTag[U])
+
+    }
+
+    /**
+     * A simple abstraction over the HBaseContext.streamMap method.
+     *
+     * It allow addition support for a user to take a DStream and
+     * generates a new DStream based on Gets and the results
+     * they bring back from HBase
+     *
+
+     * @param tableName     The name of the table to get from
+     * @param batchSize     The number of gets to be batched together
+     * @param javaDStream   Original DStream with data to iterate over
+     * @param makeGet       Function to convert a value in the JavaDStream to a
+     *                      HBase Get
+     * @param convertResult This will convert the HBase Result object to
+     *                      what ever the user wants to put in the resulting
+     *                      JavaDStream
+     * return               new JavaDStream that is created by the Get to HBase
+     */
+    def streamBulkGet[T, U](tableName:TableName,
+                            batchSize:Integer,
+                            javaDStream: JavaDStream[T],
+                            makeGet: Function[T, Get],
+                            convertResult: Function[Result, U]) {
+      JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
+        batchSize,
+        javaDStream.dstream,
+        (t:T) => makeGet.call(t),
+        (r:Result) => convertResult.call(r) )(fakeClassTag[U]))(fakeClassTag[U])
+    }
+
+    /**
+     * This function will use the native HBase TableInputFormat with the
+     * given scan object to generate a new JavaRDD
+     *
+     *  @param tableName the name of the table to scan
+     *  @param scans      the HBase scan object to use to read data from HBase
+     *  @param f         function to convert a Result object from HBase into
+     *                   what the user wants in the final generated JavaRDD
+     *  @return          new JavaRDD with results from scan
+     */
+    def hbaseRDD[U](tableName: TableName,
+                    scans: Scan,
+                    f: Function[(ImmutableBytesWritable, Result), U]):
+    JavaRDD[U] = {
+      JavaRDD.fromRDD(
+        hbaseContext.hbaseRDD[U](tableName,
+          scans,
+          (v:(ImmutableBytesWritable, Result)) =>
+            f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
+    }
+
+    /**
+     * A overloaded version of HBaseContext hbaseRDD that define the
+     * type of the resulting JavaRDD
+     *
+     *  @param tableName the name of the table to scan
+     *  @param scans      the HBase scan object to use to read data from HBase
+     *  @return New JavaRDD with results from scan
+     *
+     */
+    def hbaseRDD(tableName: TableName,
+                 scans: Scan):
+    JavaRDD[(ImmutableBytesWritable, Result)] = {
+      JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
+    }
+
+    /**
+     * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+     *
+     * This method is used to keep ClassTags out of the external Java API, as the Java compiler
+     * cannot produce them automatically. While this ClassTag-faking does please the compiler,
+     * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
+     *
+     * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+     * just worse performance or security issues.
+     * For instance, an Array[AnyRef] can hold any type T,
+     * but may lose primitive
+     * specialization.
+     */
+    private[spark]
+    def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
new file mode 100644
index 0000000..f77721f
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of deleting records in HBase
+ * with the bulkDelete function.
+ */
+object HBaseBulkDeleteExample {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      println("HBaseBulkDeletesExample {tableName} ")
+      return
+    }
+
+    val tableName = args(0)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
+    val sc = new SparkContext(sparkConf)
+    try {
+      //[Array[Byte]]
+      val rdd = sc.parallelize(Array(
+        Bytes.toBytes("1"),
+        Bytes.toBytes("2"),
+        Bytes.toBytes("3"),
+        Bytes.toBytes("4"),
+        Bytes.toBytes("5")
+      ))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+      hbaseContext.bulkDelete[Array[Byte]](rdd,
+        TableName.valueOf(tableName),
+        putRecord => new Delete(putRecord),
+        4)
+    } finally {
+      sc.stop()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
new file mode 100644
index 0000000..88f52fb
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.client.Result
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of getting records in HBase
+ * with the bulkGet function.
+ */
+object HBaseBulkGetExample {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      println("HBaseBulkGetExample {tableName}")
+      return
+    }
+
+    val tableName = args(0)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+
+      //[(Array[Byte])]
+      val rdd = sc.parallelize(Array(
+        Bytes.toBytes("1"),
+        Bytes.toBytes("2"),
+        Bytes.toBytes("3"),
+        Bytes.toBytes("4"),
+        Bytes.toBytes("5"),
+        Bytes.toBytes("6"),
+        Bytes.toBytes("7")))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+      val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+        TableName.valueOf(tableName),
+        2,
+        rdd,
+        record => {
+          System.out.println("making Get")
+          new Get(record)
+        },
+        (result: Result) => {
+
+          val it = result.listCells().iterator()
+          val b = new StringBuilder
+
+          b.append(Bytes.toString(result.getRow) + ":")
+
+          while (it.hasNext) {
+            val cell = it.next()
+            val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+            if (q.equals("counter")) {
+              b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
+            } else {
+              b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
+            }
+          }
+          b.toString()
+        })
+
+      getRdd.collect().foreach(v => println(v))
+
+    } finally {
+      sc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
new file mode 100644
index 0000000..735efed
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.
+ */
+object HBaseBulkPutExample {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      println("HBaseBulkPutExample {tableName} {columnFamily}")
+      return
+    }
+
+    val tableName = args(0)
+    val columnFamily = args(1)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
+      tableName + " " + columnFamily)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+      //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
+      val rdd = sc.parallelize(Array(
+        (Bytes.toBytes("1"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+        (Bytes.toBytes("2"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+        (Bytes.toBytes("3"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+        (Bytes.toBytes("4"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+        (Bytes.toBytes("5"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
+      ))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+      hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+        TableName.valueOf(tableName),
+        (putRecord) => {
+          val put = new Put(putRecord._1)
+          putRecord._2.foreach((putValue) =>
+            put.addColumn(putValue._1, putValue._2, putValue._3))
+          put
+        });
+    } finally {
+      sc.stop()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
new file mode 100644
index 0000000..3fd3006
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.Text
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.  In this example we are
+ * getting the put information from a file
+ */
+object HBaseBulkPutExampleFromFile {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile}")
+      return
+    }
+
+    val tableName = args(0)
+    val columnFamily = args(1)
+    val inputFile = args(2)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " +
+      tableName + " " + columnFamily + " " + inputFile)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+      var rdd = sc.hadoopFile(
+        inputFile,
+        classOf[TextInputFormat],
+        classOf[LongWritable],
+        classOf[Text]).map(v => {
+        System.out.println("reading-" + v._2.toString)
+        v._2.toString
+      })
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+      hbaseContext.bulkPut[String](rdd,
+        TableName.valueOf(tableName),
+        (putRecord) => {
+          System.out.println("hbase-" + putRecord)
+          val put = new Put(Bytes.toBytes("Value- " + putRecord))
+          put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"),
+            Bytes.toBytes(putRecord.length()))
+          put
+        });
+    } finally {
+      sc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
new file mode 100644
index 0000000..ae92f37
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.  In this example we are
+ * also setting the timestamp in the put
+ */
+object HBaseBulkPutTimestampExample {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily}")
+      return
+    }
+
+    val tableName = args(0)
+    val columnFamily = args(1)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
+      tableName + " " + columnFamily)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+
+      val rdd = sc.parallelize(Array(
+        (Bytes.toBytes("6"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+        (Bytes.toBytes("7"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+        (Bytes.toBytes("8"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+        (Bytes.toBytes("9"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+        (Bytes.toBytes("10"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))
+
+      val conf = HBaseConfiguration.create()
+
+      val timeStamp = System.currentTimeMillis()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+      hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+        TableName.valueOf(tableName),
+        (putRecord) => {
+          val put = new Put(putRecord._1)
+          putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
+            timeStamp, putValue._3))
+          put
+        })
+    } finally {
+      sc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
new file mode 100644
index 0000000..852b198
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.spark.SparkConf
+/**
+ * This is a simple example of scanning records from HBase
+ * with the hbaseRDD function.
+ */
+object HBaseDistributedScanExample {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      println("GenerateGraphs {tableName}")
+      return
+    }
+
+    val tableName = args(0)
+
+    val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName )
+    val sc = new SparkContext(sparkConf)
+
+    try {
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+      val scan = new Scan()
+      scan.setCaching(100)
+
+      val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+
+      getRdd.foreach(v => println(Bytes.toString(v._1.get())))
+
+      println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length);
+
+        //.collect().foreach(v => println(Bytes.toString(v._1.get())))
+    } finally {
+      sc.stop()
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
new file mode 100644
index 0000000..29afa49
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of BulkPut with Spark Streaming
+ */
+object HBaseStreamingBulkPutExample {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      println("HBaseStreamingBulkPutExample " +
+        "{host} {port} {tableName} {columnFamily}")
+      return
+    }
+
+    val host = args(0)
+    val port = args(1)
+    val tableName = args(2)
+    val columnFamily = args(3)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
+      tableName + " " + columnFamily)
+    val sc = new SparkContext(sparkConf)
+    try {
+      val ssc = new StreamingContext(sc, Seconds(1))
+
+      val lines = ssc.socketTextStream(host, port.toInt)
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+      hbaseContext.streamBulkPut[String](lines,
+        TableName.valueOf(tableName),
+        (putRecord) => {
+          if (putRecord.length() > 0) {
+            val put = new Put(Bytes.toBytes(putRecord))
+            put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar"))
+            put
+          } else {
+            null
+          }
+        })
+      ssc.start()
+      ssc.awaitTerminationOrTimeout(60000)
+    } finally {
+      sc.stop()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
new file mode 100644
index 0000000..b8f40a8
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of deleting records in HBase
+ * with the bulkDelete function.
+ */
+object HBaseBulkDeleteExample {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      println("HBaseBulkDeletesExample {tableName} ")
+      return
+    }
+
+    val tableName = args(0)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
+    val sc = new SparkContext(sparkConf)
+    try {
+      //[Array[Byte]]
+      val rdd = sc.parallelize(Array(
+        Bytes.toBytes("1"),
+        Bytes.toBytes("2"),
+        Bytes.toBytes("3"),
+        Bytes.toBytes("4"),
+        Bytes.toBytes("5")
+      ))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+      rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName),
+        putRecord => new Delete(putRecord),
+        4)
+
+    } finally {
+      sc.stop()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
new file mode 100644
index 0000000..9d59e96
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.{Result, Get}
+import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of getting records in HBase
+ * with the bulkGet function.
+ */
+object HBaseBulkGetExample {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      println("HBaseBulkGetExample {tableName}")
+      return
+    }
+
+    val tableName = args(0)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+
+      //[(Array[Byte])]
+      val rdd = sc.parallelize(Array(
+        Bytes.toBytes("1"),
+        Bytes.toBytes("2"),
+        Bytes.toBytes("3"),
+        Bytes.toBytes("4"),
+        Bytes.toBytes("5"),
+        Bytes.toBytes("6"),
+        Bytes.toBytes("7")))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+      val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
+        record => {
+          System.out.println("making Get")
+          new Get(record)
+        },
+        (result: Result) => {
+
+          val it = result.listCells().iterator()
+          val b = new StringBuilder
+
+          b.append(Bytes.toString(result.getRow) + ":")
+
+          while (it.hasNext) {
+            val cell = it.next()
+            val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+            if (q.equals("counter")) {
+              b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
+            } else {
+              b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
+            }
+          }
+          b.toString()
+        })
+
+      getRdd.collect().foreach(v => println(v))
+
+    } finally {
+      sc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
new file mode 100644
index 0000000..2d07e89
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.
+ */
+object HBaseBulkPutExample {
+   def main(args: Array[String]) {
+     if (args.length < 2) {
+       println("HBaseBulkPutExample {tableName} {columnFamily}")
+       return
+     }
+
+     val tableName = args(0)
+     val columnFamily = args(1)
+
+     val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
+       tableName + " " + columnFamily)
+     val sc = new SparkContext(sparkConf)
+
+     try {
+       //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
+       val rdd = sc.parallelize(Array(
+         (Bytes.toBytes("1"),
+           Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+         (Bytes.toBytes("2"),
+           Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+         (Bytes.toBytes("3"),
+           Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+         (Bytes.toBytes("4"),
+           Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+         (Bytes.toBytes("5"),
+           Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
+       ))
+
+       val conf = HBaseConfiguration.create()
+
+       val hbaseContext = new HBaseContext(sc, conf)
+
+       rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName),
+         (putRecord) => {
+           val put = new Put(putRecord._1)
+           putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
+             putValue._3))
+           put
+         })
+
+     } finally {
+       sc.stop()
+     }
+   }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
new file mode 100644
index 0000000..e2ad224
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of using the foreachPartition
+ * method with a HBase connection
+ */
+object HBaseForeachPartitionExample {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      println("HBaseBulkPutExample {tableName} {columnFamily}")
+      return
+    }
+
+    val tableName = args(0)
+    val columnFamily = args(1)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
+      tableName + " " + columnFamily)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+      //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
+      val rdd = sc.parallelize(Array(
+        (Bytes.toBytes("1"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+        (Bytes.toBytes("2"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+        (Bytes.toBytes("3"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+        (Bytes.toBytes("4"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+        (Bytes.toBytes("5"),
+          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
+      ))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+
+      rdd.hbaseForeachPartition(hbaseContext,
+        (it, connection) => {
+          val m = connection.getBufferedMutator(TableName.valueOf(tableName))
+
+          it.foreach(r => {
+            val put = new Put(r._1)
+            r._2.foreach((putValue) =>
+              put.addColumn(putValue._1, putValue._2, putValue._3))
+            m.mutate(put)
+          })
+          m.flush()
+          m.close()
+        })
+
+    } finally {
+      sc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
new file mode 100644
index 0000000..bc444be
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of using the mapPartitions
+ * method with a HBase connection
+ */
+object HBaseMapPartitionExample {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      println("HBaseBulkGetExample {tableName}")
+      return
+    }
+
+    val tableName = args(0)
+
+    val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
+    val sc = new SparkContext(sparkConf)
+
+    try {
+
+      //[(Array[Byte])]
+      val rdd = sc.parallelize(Array(
+        Bytes.toBytes("1"),
+        Bytes.toBytes("2"),
+        Bytes.toBytes("3"),
+        Bytes.toBytes("4"),
+        Bytes.toBytes("5"),
+        Bytes.toBytes("6"),
+        Bytes.toBytes("7")))
+
+      val conf = HBaseConfiguration.create()
+
+      val hbaseContext = new HBaseContext(sc, conf)
+
+      val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => {
+        val table = connection.getTable(TableName.valueOf(tableName))
+        it.map{r =>
+          //batching would be faster.  This is just an example
+          val result = table.get(new Get(r))
+
+          val it = result.listCells().iterator()
+          val b = new StringBuilder
+
+          b.append(Bytes.toString(result.getRow) + ":")
+
+          while (it.hasNext) {
+            val cell = it.next()
+            val q = Bytes.toString(cell.getQualifierArray)
+            if (q.equals("counter")) {
+              b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")")
+            } else {
+              b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")")
+            }
+          }
+          b.toString()
+        }
+      })
+
+      getRdd.collect().foreach(v => println(v))
+
+    } finally {
+      sc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
new file mode 100644
index 0000000..f19ad10
--- /dev/null
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.junit.*;
+
+import scala.Tuple2;
+
+import com.google.common.io.Files;
+
+public class JavaHBaseContextSuite implements Serializable {
+  private transient JavaSparkContext jsc;
+  HBaseTestingUtility htu;
+  protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class);
+
+
+  byte[] tableName = Bytes.toBytes("t1");
+  byte[] columnFamily = Bytes.toBytes("c");
+  String columnFamilyStr = Bytes.toString(columnFamily);
+
+  @Before
+  public void setUp() {
+    jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
+    jsc.addJar("spark.jar");
+
+    File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+
+    htu = HBaseTestingUtility.createLocalHTU();
+    try {
+      LOG.info("cleaning up test dir");
+
+      htu.cleanupTestDir();
+
+      LOG.info("starting minicluster");
+
+      htu.startMiniZKCluster();
+      htu.startMiniHBaseCluster(1, 1);
+
+      LOG.info(" - minicluster started");
+
+      try {
+        htu.deleteTable(TableName.valueOf(tableName));
+      } catch (Exception e) {
+        LOG.info(" - no table " + Bytes.toString(tableName) + " found");
+      }
+
+      LOG.info(" - creating table " + Bytes.toString(tableName));
+      htu.createTable(TableName.valueOf(tableName),
+              columnFamily);
+      LOG.info(" - created table");
+    } catch (Exception e1) {
+      throw new RuntimeException(e1);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      htu.deleteTable(TableName.valueOf(tableName));
+      LOG.info("shuting down minicluster");
+      htu.shutdownMiniHBaseCluster();
+      htu.shutdownMiniZKCluster();
+      LOG.info(" - minicluster shut down");
+      htu.cleanupTestDir();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    jsc.stop();
+    jsc = null;
+  }
+
+  @Test
+  public void testBulkPut() throws IOException {
+
+    List<String> list = new ArrayList<>();
+    list.add("1," + columnFamilyStr + ",a,1");
+    list.add("2," + columnFamilyStr + ",a,2");
+    list.add("3," + columnFamilyStr + ",a,3");
+    list.add("4," + columnFamilyStr + ",a,4");
+    list.add("5," + columnFamilyStr + ",a,5");
+
+    JavaRDD<String> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    Connection conn = ConnectionFactory.createConnection(conf);
+    Table table = conn.getTable(TableName.valueOf(tableName));
+
+    try {
+      List<Delete> deletes = new ArrayList<>();
+      for (int i = 1; i < 6; i++) {
+        deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
+      }
+      table.delete(deletes);
+    } finally {
+      table.close();
+    }
+
+    hbaseContext.bulkPut(rdd,
+            TableName.valueOf(tableName),
+            new PutFunction());
+
+    table = conn.getTable(TableName.valueOf(tableName));
+
+    try {
+      Result result1 = table.get(new Get(Bytes.toBytes("1")));
+      Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
+
+      Result result2 = table.get(new Get(Bytes.toBytes("2")));
+      Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
+
+      Result result3 = table.get(new Get(Bytes.toBytes("3")));
+      Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
+
+      Result result4 = table.get(new Get(Bytes.toBytes("4")));
+      Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+      Result result5 = table.get(new Get(Bytes.toBytes("5")));
+      Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+    } finally {
+      table.close();
+      conn.close();
+    }
+  }
+
+  public static class PutFunction implements Function<String, Put> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Put call(String v) throws Exception {
+      String[] cells = v.split(",");
+      Put put = new Put(Bytes.toBytes(cells[0]));
+
+      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
+              Bytes.toBytes(cells[3]));
+      return put;
+    }
+  }
+
+  @Test
+  public void testBulkDelete() throws IOException {
+    List<byte[]> list = new ArrayList<>();
+    list.add(Bytes.toBytes("1"));
+    list.add(Bytes.toBytes("2"));
+    list.add(Bytes.toBytes("3"));
+
+    JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+
+    populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
+            new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
+
+
+
+    try (
+            Connection conn = ConnectionFactory.createConnection(conf);
+            Table table = conn.getTable(TableName.valueOf(tableName))
+    ){
+      Result result1 = table.get(new Get(Bytes.toBytes("1")));
+      Assert.assertNull("Row 1 should had been deleted", result1.getRow());
+
+      Result result2 = table.get(new Get(Bytes.toBytes("2")));
+      Assert.assertNull("Row 2 should had been deleted", result2.getRow());
+
+      Result result3 = table.get(new Get(Bytes.toBytes("3")));
+      Assert.assertNull("Row 3 should had been deleted", result3.getRow());
+
+      Result result4 = table.get(new Get(Bytes.toBytes("4")));
+      Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+      Result result5 = table.get(new Get(Bytes.toBytes("5")));
+      Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+    }
+  }
+
+  @Test
+  public void testDistributedScan() throws IOException {
+    Configuration conf = htu.getConfiguration();
+
+    populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    Scan scan = new Scan();
+    scan.setCaching(100);
+
+    JavaRDD<String> javaRdd =
+            hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+                    .map(new ScanConvertFunction());
+
+    List<String> results = javaRdd.collect();
+
+    Assert.assertEquals(results.size(), 5);
+  }
+
+  private static class ScanConvertFunction implements
+          Function<Tuple2<ImmutableBytesWritable, Result>, String> {
+    @Override
+    public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
+      return Bytes.toString(v1._1().copyBytes());
+    }
+  }
+
+  @Test
+  public void testBulkGet() throws IOException {
+    List<byte[]> list = new ArrayList<>();
+    list.add(Bytes.toBytes("1"));
+    list.add(Bytes.toBytes("2"));
+    list.add(Bytes.toBytes("3"));
+    list.add(Bytes.toBytes("4"));
+    list.add(Bytes.toBytes("5"));
+
+    JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+
+    populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    final JavaRDD<String> stringJavaRDD =
+            hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
+            new GetFunction(),
+            new ResultFunction());
+
+    Assert.assertEquals(stringJavaRDD.count(), 5);
+  }
+
+  public static class GetFunction implements Function<byte[], Get> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Get call(byte[] v) throws Exception {
+      return new Get(v);
+    }
+  }
+
+  public static class ResultFunction implements Function<Result, String> {
+
+    private static final long serialVersionUID = 1L;
+
+    public String call(Result result) throws Exception {
+      Iterator<Cell> it = result.listCells().iterator();
+      StringBuilder b = new StringBuilder();
+
+      b.append(Bytes.toString(result.getRow())).append(":");
+
+      while (it.hasNext()) {
+        Cell cell = it.next();
+        String q = Bytes.toString(CellUtil.cloneQualifier(cell));
+        if ("counter".equals(q)) {
+          b.append("(")
+                  .append(q)
+                  .append(",")
+                  .append(Bytes.toLong(CellUtil.cloneValue(cell)))
+                  .append(")");
+        } else {
+          b.append("(")
+                  .append(q)
+                  .append(",")
+                  .append(Bytes.toString(CellUtil.cloneValue(cell)))
+                  .append(")");
+        }
+      }
+      return b.toString();
+    }
+  }
+
+  private void populateTableWithMockData(Configuration conf, TableName tableName)
+          throws IOException {
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf);
+      Table table = conn.getTable(tableName)) {
+
+      List<Put> puts = new ArrayList<>();
+
+      for (int i = 1; i < 6; i++) {
+        Put put = new Put(Bytes.toBytes(Integer.toString(i)));
+        put.addColumn(columnFamily, columnFamily, columnFamily);
+        puts.add(put);
+      }
+      table.put(puts);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
new file mode 100644
index 0000000..b27cfc7
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility}
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class HBaseContextSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
+
+  @transient var sc: SparkContext = null
+  var TEST_UTIL = new HBaseTestingUtility
+
+  val tableName = "t1"
+  val columnFamily = "c"
+
+  override def beforeAll() {
+    TEST_UTIL.startMiniCluster()
+    logInfo(" - minicluster started")
+
+    try {
+      TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    } catch {
+      case e: Exception =>
+        logInfo(" - no table " + tableName + " found")
+    }
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+    logInfo(" - created table")
+
+    val envMap = Map[String,String](("Xmx", "512m"))
+
+    sc = new SparkContext("local", "test", null, Nil, envMap)
+  }
+
+  override def afterAll() {
+    logInfo("shuting down minicluster")
+    TEST_UTIL.shutdownMiniCluster()
+    logInfo(" - minicluster shut down")
+    TEST_UTIL.cleanupTestDir()
+    sc.stop()
+  }
+
+  test("bulkput to test HBase client") {
+    val config = TEST_UTIL.getConfiguration
+    val rdd = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
+      (Bytes.toBytes("5"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
+
+    val hbaseContext = new HBaseContext(sc, config)
+    hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+      TableName.valueOf(tableName),
+      (putRecord) => {
+        val put = new Put(putRecord._1)
+        putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
+        put
+      })
+
+    val connection = ConnectionFactory.createConnection(config)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
+      assert(foo1 == "foo1")
+
+      val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
+      assert(foo2 == "foo2")
+
+      val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
+      assert(foo3 == "foo3")
+
+      val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
+      assert(foo4 == "foo")
+
+      val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
+      assert(foo5 == "bar")
+
+    } finally {
+      table.close()
+      connection.close()
+    }
+  }
+
+  test("bulkDelete to test HBase client") {
+    val config = TEST_UTIL.getConfiguration
+    val connection = ConnectionFactory.createConnection(config)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      var put = new Put(Bytes.toBytes("delete1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("delete2"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("delete3"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      table.put(put)
+
+      val rdd = sc.parallelize(Array(
+        Bytes.toBytes("delete1"),
+        Bytes.toBytes("delete3")))
+
+      val hbaseContext = new HBaseContext(sc, config)
+      hbaseContext.bulkDelete[Array[Byte]](rdd,
+        TableName.valueOf(tableName),
+        putRecord => new Delete(putRecord),
+        4)
+
+      assert(table.get(new Get(Bytes.toBytes("delete1"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
+      assert(table.get(new Get(Bytes.toBytes("delete3"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
+      assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2"))
+    } finally {
+      table.close()
+      connection.close()
+    }
+  }
+
+  test("bulkGet to test HBase client") {
+    val config = TEST_UTIL.getConfiguration
+    val connection = ConnectionFactory.createConnection(config)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      var put = new Put(Bytes.toBytes("get1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get2"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get3"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      table.put(put)
+    } finally {
+      table.close()
+      connection.close()
+    }
+    val rdd = sc.parallelize(Array(
+      Bytes.toBytes("get1"),
+      Bytes.toBytes("get2"),
+      Bytes.toBytes("get3"),
+      Bytes.toBytes("get4")))
+    val hbaseContext = new HBaseContext(sc, config)
+
+    val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+      TableName.valueOf(tableName),
+      2,
+      rdd,
+      record => {
+        new Get(record)
+      },
+      (result: Result) => {
+        if (result.listCells() != null) {
+          val it = result.listCells().iterator()
+          val B = new StringBuilder
+
+          B.append(Bytes.toString(result.getRow) + ":")
+
+          while (it.hasNext) {
+            val cell = it.next()
+            val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+            if (q.equals("counter")) {
+              B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
+            } else {
+              B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
+            }
+          }
+          "" + B.toString
+        } else {
+          ""
+        }
+      })
+    val getArray = getRdd.collect()
+
+    assert(getArray.length == 4)
+    assert(getArray.contains("get1:(a,foo1)"))
+    assert(getArray.contains("get2:(a,foo2)"))
+    assert(getArray.contains("get3:(a,foo3)"))
+
+  }
+
+  test("BulkGet failure test: bad table") {
+    val config = TEST_UTIL.getConfiguration
+
+    val rdd = sc.parallelize(Array(
+      Bytes.toBytes("get1"),
+      Bytes.toBytes("get2"),
+      Bytes.toBytes("get3"),
+      Bytes.toBytes("get4")))
+    val hbaseContext = new HBaseContext(sc, config)
+
+    intercept[SparkException] {
+      try {
+        val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+          TableName.valueOf("badTableName"),
+          2,
+          rdd,
+          record => {
+            new Get(record)
+          },
+          (result: Result) => "1")
+
+        getRdd.collect()
+
+        fail("We should have failed and not reached this line")
+      } catch {
+        case ex: SparkException => {
+          assert(
+            ex.getMessage.contains(
+              "org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException"))
+          throw ex
+        }
+      }
+    }
+  }
+
+  test("BulkGet failure test: bad column") {
+
+    val config = TEST_UTIL.getConfiguration
+    val connection = ConnectionFactory.createConnection(config)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      var put = new Put(Bytes.toBytes("get1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get2"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("get3"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      table.put(put)
+    } finally {
+      table.close()
+      connection.close()
+    }
+
+    val rdd = sc.parallelize(Array(
+      Bytes.toBytes("get1"),
+      Bytes.toBytes("get2"),
+      Bytes.toBytes("get3"),
+      Bytes.toBytes("get4")))
+    val hbaseContext = new HBaseContext(sc, config)
+
+    val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+      TableName.valueOf(tableName),
+      2,
+      rdd,
+      record => {
+        new Get(record)
+      },
+      (result: Result) => {
+        if (result.listCells() != null) {
+          val cellValue = result.getColumnLatestCell(
+            Bytes.toBytes("c"), Bytes.toBytes("bad_column"))
+          if (cellValue == null) "null" else "bad"
+        } else "noValue"
+      })
+    var nullCounter = 0
+    var noValueCounter = 0
+    getRdd.collect().foreach(r => {
+      if ("null".equals(r)) nullCounter += 1
+      else if ("noValue".equals(r)) noValueCounter += 1
+    })
+    assert(nullCounter == 3)
+    assert(noValueCounter == 1)
+  }
+
+  test("distributedScan to test HBase client") {
+    val config = TEST_UTIL.getConfiguration
+    val connection = ConnectionFactory.createConnection(config)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      var put = new Put(Bytes.toBytes("scan1"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("scan2"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("scan3"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("scan4"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      table.put(put)
+      put = new Put(Bytes.toBytes("scan5"))
+      put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+      table.put(put)
+    } finally {
+      table.close()
+      connection.close()
+    }
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    val scan = new Scan()
+    scan.setCaching(100)
+    scan.setStartRow(Bytes.toBytes("scan2"))
+    scan.setStopRow(Bytes.toBytes("scan4_"))
+
+    val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+
+    try {
+      val scanList = scanRdd.map(r => r._1.copyBytes()).collect()
+      assert(scanList.length == 3)
+    } catch {
+      case ex: Exception => ex.printStackTrace()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala
new file mode 100644
index 0000000..007aa84
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+import scala.collection.mutable
+
+class HBaseDStreamFunctionsSuite  extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+  @transient var sc: SparkContext = null
+
+  var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
+
+  val tableName = "t1"
+  val columnFamily = "c"
+
+  override def beforeAll() {
+    TEST_UTIL.startMiniCluster()
+
+    logInfo(" - minicluster started")
+    try
+      TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    catch {
+      case e: Exception => logInfo(" - no table " + tableName + " found")
+
+    }
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+    logInfo(" - created table")
+
+    sc = new SparkContext("local", "test")
+  }
+
+  override def afterAll() {
+    TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    TEST_UTIL.shutdownMiniCluster()
+    sc.stop()
+  }
+
+  test("bulkput to test HBase client") {
+    val config = TEST_UTIL.getConfiguration
+    val rdd1 = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3"))))))
+
+    val rdd2 = sc.parallelize(Array(
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
+      (Bytes.toBytes("5"),
+        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
+
+    val hbaseContext = new HBaseContext(sc, config)
+    val ssc = new StreamingContext(sc, Milliseconds(200))
+
+    val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
+      Array[Byte], Array[Byte])])]]()
+    queue += rdd1
+    queue += rdd2
+    val dStream = ssc.queueStream(queue)
+
+    dStream.hbaseBulkPut(
+      hbaseContext,
+      TableName.valueOf(tableName),
+      (putRecord) => {
+        val put = new Put(putRecord._1)
+        putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
+        put
+      })
+
+    ssc.start()
+
+    ssc.awaitTerminationOrTimeout(1000)
+
+    val connection = ConnectionFactory.createConnection(config)
+    val table = connection.getTable(TableName.valueOf("t1"))
+
+    try {
+      val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
+      assert(foo1 == "foo1")
+
+      val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
+      assert(foo2 == "foo2")
+
+      val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
+      assert(foo3 == "foo3")
+
+      val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
+      assert(foo4 == "foo")
+
+      val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
+        getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
+      assert(foo5 == "bar")
+    } finally {
+      table.close()
+      connection.close()
+    }
+  }
+
+}
\ No newline at end of file