You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/07/28 18:47:04 UTC
[2/3] hbase git commit: HBASE-13992 Integrate SparkOnHBase into HBase
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
new file mode 100644
index 0000000..4839892
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.function.VoidFunction
+import org.apache.spark.api.java.function.Function
+import org.apache.hadoop.hbase.client.Connection
+import org.apache.spark.streaming.api.java.JavaDStream
+import org.apache.spark.api.java.function.FlatMapFunction
+import scala.collection.JavaConversions._
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import scala.reflect.ClassTag
+
+/**
+ * This is the Java Wrapper over HBaseContext which is written in
+ * Scala. This class will be used by developers that want to
+ * work with Spark or Spark Streaming in Java
+ *
+ * @param jsc This is the JavaSparkContext that we will wrap
+ * @param config This is the config information to out HBase cluster
+ */
+class JavaHBaseContext(@transient jsc: JavaSparkContext,
+ @transient config: Configuration) extends Serializable {
+ val hbaseContext = new HBaseContext(jsc.sc, config)
+
+ /**
+ * A simple enrichment of the traditional Spark javaRdd foreachPartition.
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * @param javaRdd Original javaRdd with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the RDD values and a HConnection object to interact
+ * with HBase
+ */
+ def foreachPartition[T](javaRdd: JavaRDD[T],
+ f: VoidFunction[(java.util.Iterator[T], Connection)] ) = {
+
+ hbaseContext.foreachPartition(javaRdd.rdd,
+ (it:Iterator[T], conn:Connection) =>
+ { f.call((it, conn)) })
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark Streaming dStream foreach
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * @param javaDstream Original DStream with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the JavaDStream values and a HConnection object to
+ * interact with HBase
+ */
+ def foreachPartition[T](javaDstream: JavaDStream[T],
+ f: VoidFunction[(Iterator[T], Connection)]) = {
+ hbaseContext.foreachPartition(javaDstream.dstream,
+ (it:Iterator[T], conn: Connection) => f.call(it, conn))
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark JavaRDD mapPartition.
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * Note: Make sure to partition correctly to avoid memory issue when
+ * getting data from HBase
+ *
+ * @param javaRdd Original JavaRdd with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the RDD values and a HConnection object to interact
+ * with HBase
+ * @return Returns a new RDD generated by the user definition
+ * function just like normal mapPartition
+ */
+ def mapPartitions[T,R](javaRdd: JavaRDD[T],
+ f: FlatMapFunction[(java.util.Iterator[T],
+ Connection),R] ): JavaRDD[R] = {
+
+ def fn = (it: Iterator[T], conn: Connection) =>
+ asScalaIterator(
+ f.call((asJavaIterator(it), conn)).iterator()
+ )
+
+ JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
+ (iterator:Iterator[T], connection:Connection) =>
+ fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark Streaming JavaDStream
+ * mapPartition.
+ *
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * Note: Make sure to partition correctly to avoid memory issue when
+ * getting data from HBase
+ *
+ * @param javaDstream Original JavaDStream with data to iterate over
+ * @param mp Function to be given a iterator to iterate through
+ * the JavaDStream values and a HConnection object to
+ * interact with HBase
+ * @return Returns a new JavaDStream generated by the user
+ * definition function just like normal mapPartition
+ */
+ def streamMap[T, U](javaDstream: JavaDStream[T],
+ mp: Function[(Iterator[T], Connection), Iterator[U]]):
+ JavaDStream[U] = {
+ JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
+ (it: Iterator[T], conn: Connection) =>
+ mp.call(it, conn) )(fakeClassTag[U]))(fakeClassTag[U])
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.foreachPartition method.
+ *
+ * It allow addition support for a user to take JavaRDD
+ * and generate puts and send them to HBase.
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param javaDdd Original JavaRDD with data to iterate over
+ * @param tableName The name of the table to put into
+ * @param f Function to convert a value in the JavaRDD
+ * to a HBase Put
+ */
+ def bulkPut[T](javaDdd: JavaRDD[T],
+ tableName: TableName,
+ f: Function[(T), Put]) {
+
+ hbaseContext.bulkPut(javaDdd.rdd, tableName, (t:T) => f.call(t))
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.streamMapPartition method.
+ *
+ * It allow addition support for a user to take a JavaDStream and
+ * generate puts and send them to HBase.
+ *
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param javaDstream Original DStream with data to iterate over
+ * @param tableName The name of the table to put into
+ * @param f Function to convert a value in
+ * the JavaDStream to a HBase Put
+ */
+ def streamBulkPut[T](javaDstream: JavaDStream[T],
+ tableName: TableName,
+ f: Function[T,Put]) = {
+ hbaseContext.streamBulkPut(javaDstream.dstream,
+ tableName,
+ (t:T) => f.call(t))
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.foreachPartition method.
+ *
+ * It allow addition support for a user to take a JavaRDD and
+ * generate delete and send them to HBase.
+ *
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param javaRdd Original JavaRDD with data to iterate over
+ * @param tableName The name of the table to delete from
+ * @param f Function to convert a value in the JavaRDD to a
+ * HBase Deletes
+ * @param batchSize The number of deletes to batch before sending to HBase
+ */
+ def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
+ f: Function[T, Delete], batchSize:Integer) {
+ hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t:T) => f.call(t), batchSize)
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.streamBulkMutation method.
+ *
+ * It allow addition support for a user to take a JavaDStream and
+ * generate Delete and send them to HBase.
+ *
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param javaDstream Original DStream with data to iterate over
+ * @param tableName The name of the table to delete from
+ * @param f Function to convert a value in the JavaDStream to a
+ * HBase Delete
+ * @param batchSize The number of deletes to be sent at once
+ */
+ def streamBulkDelete[T](javaDstream: JavaDStream[T],
+ tableName: TableName,
+ f: Function[T, Delete],
+ batchSize: Integer) = {
+ hbaseContext.streamBulkDelete(javaDstream.dstream, tableName,
+ (t:T) => f.call(t),
+ batchSize)
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.mapPartition method.
+ *
+ * It allow addition support for a user to take a JavaRDD and generates a
+ * new RDD based on Gets and the results they bring back from HBase
+ *
+ * @param tableName The name of the table to get from
+ * @param batchSize batch size of how many gets to retrieve in a single fetch
+ * @param javaRdd Original JavaRDD with data to iterate over
+ * @param makeGet Function to convert a value in the JavaRDD to a
+ * HBase Get
+ * @param convertResult This will convert the HBase Result object to
+ * what ever the user wants to put in the resulting
+ * JavaRDD
+ * return new JavaRDD that is created by the Get to HBase
+ */
+ def bulkGet[T, U](tableName: TableName,
+ batchSize:Integer,
+ javaRdd: JavaRDD[T],
+ makeGet: Function[T, Get],
+ convertResult: Function[Result, U]): JavaRDD[U] = {
+
+ JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
+ batchSize,
+ javaRdd.rdd,
+ (t:T) => makeGet.call(t),
+ (r:Result) => {convertResult.call(r)})(fakeClassTag[U]))(fakeClassTag[U])
+
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.streamMap method.
+ *
+ * It allow addition support for a user to take a DStream and
+ * generates a new DStream based on Gets and the results
+ * they bring back from HBase
+ *
+
+ * @param tableName The name of the table to get from
+ * @param batchSize The number of gets to be batched together
+ * @param javaDStream Original DStream with data to iterate over
+ * @param makeGet Function to convert a value in the JavaDStream to a
+ * HBase Get
+ * @param convertResult This will convert the HBase Result object to
+ * what ever the user wants to put in the resulting
+ * JavaDStream
+ * return new JavaDStream that is created by the Get to HBase
+ */
+ def streamBulkGet[T, U](tableName:TableName,
+ batchSize:Integer,
+ javaDStream: JavaDStream[T],
+ makeGet: Function[T, Get],
+ convertResult: Function[Result, U]) {
+ JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
+ batchSize,
+ javaDStream.dstream,
+ (t:T) => makeGet.call(t),
+ (r:Result) => convertResult.call(r) )(fakeClassTag[U]))(fakeClassTag[U])
+ }
+
+ /**
+ * This function will use the native HBase TableInputFormat with the
+ * given scan object to generate a new JavaRDD
+ *
+ * @param tableName the name of the table to scan
+ * @param scans the HBase scan object to use to read data from HBase
+ * @param f function to convert a Result object from HBase into
+ * what the user wants in the final generated JavaRDD
+ * @return new JavaRDD with results from scan
+ */
+ def hbaseRDD[U](tableName: TableName,
+ scans: Scan,
+ f: Function[(ImmutableBytesWritable, Result), U]):
+ JavaRDD[U] = {
+ JavaRDD.fromRDD(
+ hbaseContext.hbaseRDD[U](tableName,
+ scans,
+ (v:(ImmutableBytesWritable, Result)) =>
+ f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
+ }
+
+ /**
+ * A overloaded version of HBaseContext hbaseRDD that define the
+ * type of the resulting JavaRDD
+ *
+ * @param tableName the name of the table to scan
+ * @param scans the HBase scan object to use to read data from HBase
+ * @return New JavaRDD with results from scan
+ *
+ */
+ def hbaseRDD(tableName: TableName,
+ scans: Scan):
+ JavaRDD[(ImmutableBytesWritable, Result)] = {
+ JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
+ }
+
+ /**
+ * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+ *
+ * This method is used to keep ClassTags out of the external Java API, as the Java compiler
+ * cannot produce them automatically. While this ClassTag-faking does please the compiler,
+ * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
+ *
+ * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+ * just worse performance or security issues.
+ * For instance, an Array[AnyRef] can hold any type T,
+ * but may lose primitive
+ * specialization.
+ */
+ private[spark]
+ def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
new file mode 100644
index 0000000..f77721f
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of deleting records in HBase
+ * with the bulkDelete function.
+ */
+object HBaseBulkDeleteExample {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("HBaseBulkDeletesExample {tableName} ")
+ return
+ }
+
+ val tableName = args(0)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
+ val sc = new SparkContext(sparkConf)
+ try {
+ //[Array[Byte]]
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("1"),
+ Bytes.toBytes("2"),
+ Bytes.toBytes("3"),
+ Bytes.toBytes("4"),
+ Bytes.toBytes("5")
+ ))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+ hbaseContext.bulkDelete[Array[Byte]](rdd,
+ TableName.valueOf(tableName),
+ putRecord => new Delete(putRecord),
+ 4)
+ } finally {
+ sc.stop()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
new file mode 100644
index 0000000..88f52fb
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.client.Result
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of getting records in HBase
+ * with the bulkGet function.
+ */
+object HBaseBulkGetExample {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("HBaseBulkGetExample {tableName}")
+ return
+ }
+
+ val tableName = args(0)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+
+ //[(Array[Byte])]
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("1"),
+ Bytes.toBytes("2"),
+ Bytes.toBytes("3"),
+ Bytes.toBytes("4"),
+ Bytes.toBytes("5"),
+ Bytes.toBytes("6"),
+ Bytes.toBytes("7")))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+ TableName.valueOf(tableName),
+ 2,
+ rdd,
+ record => {
+ System.out.println("making Get")
+ new Get(record)
+ },
+ (result: Result) => {
+
+ val it = result.listCells().iterator()
+ val b = new StringBuilder
+
+ b.append(Bytes.toString(result.getRow) + ":")
+
+ while (it.hasNext) {
+ val cell = it.next()
+ val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+ if (q.equals("counter")) {
+ b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
+ } else {
+ b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
+ }
+ }
+ b.toString()
+ })
+
+ getRdd.collect().foreach(v => println(v))
+
+ } finally {
+ sc.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
new file mode 100644
index 0000000..735efed
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.
+ */
+object HBaseBulkPutExample {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("HBaseBulkPutExample {tableName} {columnFamily}")
+ return
+ }
+
+ val tableName = args(0)
+ val columnFamily = args(1)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
+ tableName + " " + columnFamily)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+ //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
+ ))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+ hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+ TableName.valueOf(tableName),
+ (putRecord) => {
+ val put = new Put(putRecord._1)
+ putRecord._2.foreach((putValue) =>
+ put.addColumn(putValue._1, putValue._2, putValue._3))
+ put
+ });
+ } finally {
+ sc.stop()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
new file mode 100644
index 0000000..3fd3006
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.Text
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function. In this example we are
+ * getting the put information from a file
+ */
+object HBaseBulkPutExampleFromFile {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile}")
+ return
+ }
+
+ val tableName = args(0)
+ val columnFamily = args(1)
+ val inputFile = args(2)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " +
+ tableName + " " + columnFamily + " " + inputFile)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+ var rdd = sc.hadoopFile(
+ inputFile,
+ classOf[TextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).map(v => {
+ System.out.println("reading-" + v._2.toString)
+ v._2.toString
+ })
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+ hbaseContext.bulkPut[String](rdd,
+ TableName.valueOf(tableName),
+ (putRecord) => {
+ System.out.println("hbase-" + putRecord)
+ val put = new Put(Bytes.toBytes("Value- " + putRecord))
+ put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"),
+ Bytes.toBytes(putRecord.length()))
+ put
+ });
+ } finally {
+ sc.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
new file mode 100644
index 0000000..ae92f37
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function. In this example we are
+ * also setting the timestamp in the put
+ */
+object HBaseBulkPutTimestampExample {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily}")
+ return
+ }
+
+ val tableName = args(0)
+ val columnFamily = args(1)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
+ tableName + " " + columnFamily)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("6"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+ (Bytes.toBytes("7"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+ (Bytes.toBytes("8"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+ (Bytes.toBytes("9"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+ (Bytes.toBytes("10"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))
+
+ val conf = HBaseConfiguration.create()
+
+ val timeStamp = System.currentTimeMillis()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+ hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+ TableName.valueOf(tableName),
+ (putRecord) => {
+ val put = new Put(putRecord._1)
+ putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
+ timeStamp, putValue._3))
+ put
+ })
+ } finally {
+ sc.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
new file mode 100644
index 0000000..852b198
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.spark.SparkConf
+/**
+ * This is a simple example of scanning records from HBase
+ * with the hbaseRDD function.
+ */
+object HBaseDistributedScanExample {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("GenerateGraphs {tableName}")
+ return
+ }
+
+ val tableName = args(0)
+
+ val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName )
+ val sc = new SparkContext(sparkConf)
+
+ try {
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ val scan = new Scan()
+ scan.setCaching(100)
+
+ val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+
+ getRdd.foreach(v => println(Bytes.toString(v._1.get())))
+
+ println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length);
+
+ //.collect().foreach(v => println(Bytes.toString(v._1.get())))
+ } finally {
+ sc.stop()
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
new file mode 100644
index 0000000..29afa49
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.hbasecontext
+
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.spark.SparkContext
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Put
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.SparkConf
+
+/**
+ * This is a simple example of BulkPut with Spark Streaming
+ */
+object HBaseStreamingBulkPutExample {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ println("HBaseStreamingBulkPutExample " +
+ "{host} {port} {tableName} {columnFamily}")
+ return
+ }
+
+ val host = args(0)
+ val port = args(1)
+ val tableName = args(2)
+ val columnFamily = args(3)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
+ tableName + " " + columnFamily)
+ val sc = new SparkContext(sparkConf)
+ try {
+ val ssc = new StreamingContext(sc, Seconds(1))
+
+ val lines = ssc.socketTextStream(host, port.toInt)
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ hbaseContext.streamBulkPut[String](lines,
+ TableName.valueOf(tableName),
+ (putRecord) => {
+ if (putRecord.length() > 0) {
+ val put = new Put(Bytes.toBytes(putRecord))
+ put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar"))
+ put
+ } else {
+ null
+ }
+ })
+ ssc.start()
+ ssc.awaitTerminationOrTimeout(60000)
+ } finally {
+ sc.stop()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
new file mode 100644
index 0000000..b8f40a8
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of deleting records in HBase
+ * with the bulkDelete function.
+ */
+object HBaseBulkDeleteExample {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("HBaseBulkDeletesExample {tableName} ")
+ return
+ }
+
+ val tableName = args(0)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
+ val sc = new SparkContext(sparkConf)
+ try {
+ //[Array[Byte]]
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("1"),
+ Bytes.toBytes("2"),
+ Bytes.toBytes("3"),
+ Bytes.toBytes("4"),
+ Bytes.toBytes("5")
+ ))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName),
+ putRecord => new Delete(putRecord),
+ 4)
+
+ } finally {
+ sc.stop()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
new file mode 100644
index 0000000..9d59e96
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.{Result, Get}
+import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of getting records in HBase
+ * with the bulkGet function.
+ */
+object HBaseBulkGetExample {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("HBaseBulkGetExample {tableName}")
+ return
+ }
+
+ val tableName = args(0)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+
+ //[(Array[Byte])]
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("1"),
+ Bytes.toBytes("2"),
+ Bytes.toBytes("3"),
+ Bytes.toBytes("4"),
+ Bytes.toBytes("5"),
+ Bytes.toBytes("6"),
+ Bytes.toBytes("7")))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
+ record => {
+ System.out.println("making Get")
+ new Get(record)
+ },
+ (result: Result) => {
+
+ val it = result.listCells().iterator()
+ val b = new StringBuilder
+
+ b.append(Bytes.toString(result.getRow) + ":")
+
+ while (it.hasNext) {
+ val cell = it.next()
+ val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+ if (q.equals("counter")) {
+ b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
+ } else {
+ b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
+ }
+ }
+ b.toString()
+ })
+
+ getRdd.collect().foreach(v => println(v))
+
+ } finally {
+ sc.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
new file mode 100644
index 0000000..2d07e89
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.
+ */
+object HBaseBulkPutExample {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("HBaseBulkPutExample {tableName} {columnFamily}")
+ return
+ }
+
+ val tableName = args(0)
+ val columnFamily = args(1)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
+ tableName + " " + columnFamily)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+ //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
+ ))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName),
+ (putRecord) => {
+ val put = new Put(putRecord._1)
+ putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
+ putValue._3))
+ put
+ })
+
+ } finally {
+ sc.stop()
+ }
+ }
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
new file mode 100644
index 0000000..e2ad224
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of using the foreachPartition
+ * method with a HBase connection
+ */
+object HBaseForeachPartitionExample {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("HBaseBulkPutExample {tableName} {columnFamily}")
+ return
+ }
+
+ val tableName = args(0)
+ val columnFamily = args(1)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
+ tableName + " " + columnFamily)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+ //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
+ ))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+
+ rdd.hbaseForeachPartition(hbaseContext,
+ (it, connection) => {
+ val m = connection.getBufferedMutator(TableName.valueOf(tableName))
+
+ it.foreach(r => {
+ val put = new Put(r._1)
+ r._2.foreach((putValue) =>
+ put.addColumn(putValue._1, putValue._2, putValue._3))
+ m.mutate(put)
+ })
+ m.flush()
+ m.close()
+ })
+
+ } finally {
+ sc.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
new file mode 100644
index 0000000..bc444be
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.example.rdd
+
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.spark.HBaseContext
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.{SparkContext, SparkConf}
+
+/**
+ * This is a simple example of using the mapPartitions
+ * method with a HBase connection
+ */
+object HBaseMapPartitionExample {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("HBaseBulkGetExample {tableName}")
+ return
+ }
+
+ val tableName = args(0)
+
+ val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
+ val sc = new SparkContext(sparkConf)
+
+ try {
+
+ //[(Array[Byte])]
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("1"),
+ Bytes.toBytes("2"),
+ Bytes.toBytes("3"),
+ Bytes.toBytes("4"),
+ Bytes.toBytes("5"),
+ Bytes.toBytes("6"),
+ Bytes.toBytes("7")))
+
+ val conf = HBaseConfiguration.create()
+
+ val hbaseContext = new HBaseContext(sc, conf)
+
+ val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => {
+ val table = connection.getTable(TableName.valueOf(tableName))
+ it.map{r =>
+ //batching would be faster. This is just an example
+ val result = table.get(new Get(r))
+
+ val it = result.listCells().iterator()
+ val b = new StringBuilder
+
+ b.append(Bytes.toString(result.getRow) + ":")
+
+ while (it.hasNext) {
+ val cell = it.next()
+ val q = Bytes.toString(cell.getQualifierArray)
+ if (q.equals("counter")) {
+ b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")")
+ } else {
+ b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")")
+ }
+ }
+ b.toString()
+ }
+ })
+
+ getRdd.collect().foreach(v => println(v))
+
+ } finally {
+ sc.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
new file mode 100644
index 0000000..f19ad10
--- /dev/null
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.junit.*;
+
+import scala.Tuple2;
+
+import com.google.common.io.Files;
+
+public class JavaHBaseContextSuite implements Serializable {
+ private transient JavaSparkContext jsc;
+ HBaseTestingUtility htu;
+ protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class);
+
+
+ byte[] tableName = Bytes.toBytes("t1");
+ byte[] columnFamily = Bytes.toBytes("c");
+ String columnFamilyStr = Bytes.toString(columnFamily);
+
+ @Before
+ public void setUp() {
+ jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
+ jsc.addJar("spark.jar");
+
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+
+ htu = HBaseTestingUtility.createLocalHTU();
+ try {
+ LOG.info("cleaning up test dir");
+
+ htu.cleanupTestDir();
+
+ LOG.info("starting minicluster");
+
+ htu.startMiniZKCluster();
+ htu.startMiniHBaseCluster(1, 1);
+
+ LOG.info(" - minicluster started");
+
+ try {
+ htu.deleteTable(TableName.valueOf(tableName));
+ } catch (Exception e) {
+ LOG.info(" - no table " + Bytes.toString(tableName) + " found");
+ }
+
+ LOG.info(" - creating table " + Bytes.toString(tableName));
+ htu.createTable(TableName.valueOf(tableName),
+ columnFamily);
+ LOG.info(" - created table");
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ htu.deleteTable(TableName.valueOf(tableName));
+ LOG.info("shuting down minicluster");
+ htu.shutdownMiniHBaseCluster();
+ htu.shutdownMiniZKCluster();
+ LOG.info(" - minicluster shut down");
+ htu.cleanupTestDir();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ jsc.stop();
+ jsc = null;
+ }
+
+ @Test
+ public void testBulkPut() throws IOException {
+
+ List<String> list = new ArrayList<>();
+ list.add("1," + columnFamilyStr + ",a,1");
+ list.add("2," + columnFamilyStr + ",a,2");
+ list.add("3," + columnFamilyStr + ",a,3");
+ list.add("4," + columnFamilyStr + ",a,4");
+ list.add("5," + columnFamilyStr + ",a,5");
+
+ JavaRDD<String> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(TableName.valueOf(tableName));
+
+ try {
+ List<Delete> deletes = new ArrayList<>();
+ for (int i = 1; i < 6; i++) {
+ deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
+ }
+ table.delete(deletes);
+ } finally {
+ table.close();
+ }
+
+ hbaseContext.bulkPut(rdd,
+ TableName.valueOf(tableName),
+ new PutFunction());
+
+ table = conn.getTable(TableName.valueOf(tableName));
+
+ try {
+ Result result1 = table.get(new Get(Bytes.toBytes("1")));
+ Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
+
+ Result result2 = table.get(new Get(Bytes.toBytes("2")));
+ Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
+
+ Result result3 = table.get(new Get(Bytes.toBytes("3")));
+ Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
+
+ Result result4 = table.get(new Get(Bytes.toBytes("4")));
+ Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+ Result result5 = table.get(new Get(Bytes.toBytes("5")));
+ Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+ } finally {
+ table.close();
+ conn.close();
+ }
+ }
+
+ public static class PutFunction implements Function<String, Put> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Put call(String v) throws Exception {
+ String[] cells = v.split(",");
+ Put put = new Put(Bytes.toBytes(cells[0]));
+
+ put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
+ Bytes.toBytes(cells[3]));
+ return put;
+ }
+ }
+
+ @Test
+ public void testBulkDelete() throws IOException {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+
+ populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
+ new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
+
+
+
+ try (
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(TableName.valueOf(tableName))
+ ){
+ Result result1 = table.get(new Get(Bytes.toBytes("1")));
+ Assert.assertNull("Row 1 should had been deleted", result1.getRow());
+
+ Result result2 = table.get(new Get(Bytes.toBytes("2")));
+ Assert.assertNull("Row 2 should had been deleted", result2.getRow());
+
+ Result result3 = table.get(new Get(Bytes.toBytes("3")));
+ Assert.assertNull("Row 3 should had been deleted", result3.getRow());
+
+ Result result4 = table.get(new Get(Bytes.toBytes("4")));
+ Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+ Result result5 = table.get(new Get(Bytes.toBytes("5")));
+ Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+ }
+ }
+
+ @Test
+ public void testDistributedScan() throws IOException {
+ Configuration conf = htu.getConfiguration();
+
+ populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ Scan scan = new Scan();
+ scan.setCaching(100);
+
+ JavaRDD<String> javaRdd =
+ hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+ .map(new ScanConvertFunction());
+
+ List<String> results = javaRdd.collect();
+
+ Assert.assertEquals(results.size(), 5);
+ }
+
+ private static class ScanConvertFunction implements
+ Function<Tuple2<ImmutableBytesWritable, Result>, String> {
+ @Override
+ public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
+ return Bytes.toString(v1._1().copyBytes());
+ }
+ }
+
+ @Test
+ public void testBulkGet() throws IOException {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+ list.add(Bytes.toBytes("4"));
+ list.add(Bytes.toBytes("5"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+
+ populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ final JavaRDD<String> stringJavaRDD =
+ hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
+ new GetFunction(),
+ new ResultFunction());
+
+ Assert.assertEquals(stringJavaRDD.count(), 5);
+ }
+
+ public static class GetFunction implements Function<byte[], Get> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Get call(byte[] v) throws Exception {
+ return new Get(v);
+ }
+ }
+
+ public static class ResultFunction implements Function<Result, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public String call(Result result) throws Exception {
+ Iterator<Cell> it = result.listCells().iterator();
+ StringBuilder b = new StringBuilder();
+
+ b.append(Bytes.toString(result.getRow())).append(":");
+
+ while (it.hasNext()) {
+ Cell cell = it.next();
+ String q = Bytes.toString(CellUtil.cloneQualifier(cell));
+ if ("counter".equals(q)) {
+ b.append("(")
+ .append(q)
+ .append(",")
+ .append(Bytes.toLong(CellUtil.cloneValue(cell)))
+ .append(")");
+ } else {
+ b.append("(")
+ .append(q)
+ .append(",")
+ .append(Bytes.toString(CellUtil.cloneValue(cell)))
+ .append(")");
+ }
+ }
+ return b.toString();
+ }
+ }
+
+ private void populateTableWithMockData(Configuration conf, TableName tableName)
+ throws IOException {
+ try (
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName)) {
+
+ List<Put> puts = new ArrayList<>();
+
+ for (int i = 1; i < 6; i++) {
+ Put put = new Put(Bytes.toBytes(Integer.toString(i)));
+ put.addColumn(columnFamily, columnFamily, columnFamily);
+ puts.add(put);
+ }
+ table.put(puts);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
new file mode 100644
index 0000000..b27cfc7
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility}
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class HBaseContextSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+
+ @transient var sc: SparkContext = null
+ var TEST_UTIL = new HBaseTestingUtility
+
+ val tableName = "t1"
+ val columnFamily = "c"
+
+ override def beforeAll() {
+ TEST_UTIL.startMiniCluster()
+ logInfo(" - minicluster started")
+
+ try {
+ TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ } catch {
+ case e: Exception =>
+ logInfo(" - no table " + tableName + " found")
+ }
+ logInfo(" - creating table " + tableName)
+ TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+ logInfo(" - created table")
+
+ val envMap = Map[String,String](("Xmx", "512m"))
+
+ sc = new SparkContext("local", "test", null, Nil, envMap)
+ }
+
+ override def afterAll() {
+ logInfo("shuting down minicluster")
+ TEST_UTIL.shutdownMiniCluster()
+ logInfo(" - minicluster shut down")
+ TEST_UTIL.cleanupTestDir()
+ sc.stop()
+ }
+
+ test("bulkput to test HBase client") {
+ val config = TEST_UTIL.getConfiguration
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
+
+ val hbaseContext = new HBaseContext(sc, config)
+ hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+ TableName.valueOf(tableName),
+ (putRecord) => {
+ val put = new Put(putRecord._1)
+ putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
+ put
+ })
+
+ val connection = ConnectionFactory.createConnection(config)
+ val table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
+ assert(foo1 == "foo1")
+
+ val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
+ assert(foo2 == "foo2")
+
+ val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
+ assert(foo3 == "foo3")
+
+ val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
+ assert(foo4 == "foo")
+
+ val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
+ assert(foo5 == "bar")
+
+ } finally {
+ table.close()
+ connection.close()
+ }
+ }
+
+ test("bulkDelete to test HBase client") {
+ val config = TEST_UTIL.getConfiguration
+ val connection = ConnectionFactory.createConnection(config)
+ val table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ var put = new Put(Bytes.toBytes("delete1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("delete2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("delete3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ table.put(put)
+
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("delete1"),
+ Bytes.toBytes("delete3")))
+
+ val hbaseContext = new HBaseContext(sc, config)
+ hbaseContext.bulkDelete[Array[Byte]](rdd,
+ TableName.valueOf(tableName),
+ putRecord => new Delete(putRecord),
+ 4)
+
+ assert(table.get(new Get(Bytes.toBytes("delete1"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
+ assert(table.get(new Get(Bytes.toBytes("delete3"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
+ assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2"))
+ } finally {
+ table.close()
+ connection.close()
+ }
+ }
+
+ test("bulkGet to test HBase client") {
+ val config = TEST_UTIL.getConfiguration
+ val connection = ConnectionFactory.createConnection(config)
+ val table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ var put = new Put(Bytes.toBytes("get1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("get2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("get3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ table.put(put)
+ } finally {
+ table.close()
+ connection.close()
+ }
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("get1"),
+ Bytes.toBytes("get2"),
+ Bytes.toBytes("get3"),
+ Bytes.toBytes("get4")))
+ val hbaseContext = new HBaseContext(sc, config)
+
+ val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+ TableName.valueOf(tableName),
+ 2,
+ rdd,
+ record => {
+ new Get(record)
+ },
+ (result: Result) => {
+ if (result.listCells() != null) {
+ val it = result.listCells().iterator()
+ val B = new StringBuilder
+
+ B.append(Bytes.toString(result.getRow) + ":")
+
+ while (it.hasNext) {
+ val cell = it.next()
+ val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+ if (q.equals("counter")) {
+ B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
+ } else {
+ B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
+ }
+ }
+ "" + B.toString
+ } else {
+ ""
+ }
+ })
+ val getArray = getRdd.collect()
+
+ assert(getArray.length == 4)
+ assert(getArray.contains("get1:(a,foo1)"))
+ assert(getArray.contains("get2:(a,foo2)"))
+ assert(getArray.contains("get3:(a,foo3)"))
+
+ }
+
+ test("BulkGet failure test: bad table") {
+ val config = TEST_UTIL.getConfiguration
+
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("get1"),
+ Bytes.toBytes("get2"),
+ Bytes.toBytes("get3"),
+ Bytes.toBytes("get4")))
+ val hbaseContext = new HBaseContext(sc, config)
+
+ intercept[SparkException] {
+ try {
+ val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+ TableName.valueOf("badTableName"),
+ 2,
+ rdd,
+ record => {
+ new Get(record)
+ },
+ (result: Result) => "1")
+
+ getRdd.collect()
+
+ fail("We should have failed and not reached this line")
+ } catch {
+ case ex: SparkException => {
+ assert(
+ ex.getMessage.contains(
+ "org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException"))
+ throw ex
+ }
+ }
+ }
+ }
+
+ test("BulkGet failure test: bad column") {
+
+ val config = TEST_UTIL.getConfiguration
+ val connection = ConnectionFactory.createConnection(config)
+ val table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ var put = new Put(Bytes.toBytes("get1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("get2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("get3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ table.put(put)
+ } finally {
+ table.close()
+ connection.close()
+ }
+
+ val rdd = sc.parallelize(Array(
+ Bytes.toBytes("get1"),
+ Bytes.toBytes("get2"),
+ Bytes.toBytes("get3"),
+ Bytes.toBytes("get4")))
+ val hbaseContext = new HBaseContext(sc, config)
+
+ val getRdd = hbaseContext.bulkGet[Array[Byte], String](
+ TableName.valueOf(tableName),
+ 2,
+ rdd,
+ record => {
+ new Get(record)
+ },
+ (result: Result) => {
+ if (result.listCells() != null) {
+ val cellValue = result.getColumnLatestCell(
+ Bytes.toBytes("c"), Bytes.toBytes("bad_column"))
+ if (cellValue == null) "null" else "bad"
+ } else "noValue"
+ })
+ var nullCounter = 0
+ var noValueCounter = 0
+ getRdd.collect().foreach(r => {
+ if ("null".equals(r)) nullCounter += 1
+ else if ("noValue".equals(r)) noValueCounter += 1
+ })
+ assert(nullCounter == 3)
+ assert(noValueCounter == 1)
+ }
+
+ test("distributedScan to test HBase client") {
+ val config = TEST_UTIL.getConfiguration
+ val connection = ConnectionFactory.createConnection(config)
+ val table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ var put = new Put(Bytes.toBytes("scan1"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("scan2"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("scan3"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("scan4"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ table.put(put)
+ put = new Put(Bytes.toBytes("scan5"))
+ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
+ table.put(put)
+ } finally {
+ table.close()
+ connection.close()
+ }
+
+ val hbaseContext = new HBaseContext(sc, config)
+
+ val scan = new Scan()
+ scan.setCaching(100)
+ scan.setStartRow(Bytes.toBytes("scan2"))
+ scan.setStopRow(Bytes.toBytes("scan4_"))
+
+ val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+
+ try {
+ val scanList = scanRdd.map(r => r._1.copyBytes()).collect()
+ assert(scanList.length == 3)
+ } catch {
+ case ex: Exception => ex.printStackTrace()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala
new file mode 100644
index 0000000..007aa84
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+import scala.collection.mutable
+
+class HBaseDStreamFunctionsSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+ @transient var sc: SparkContext = null
+
+ var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
+
+ val tableName = "t1"
+ val columnFamily = "c"
+
+ override def beforeAll() {
+ TEST_UTIL.startMiniCluster()
+
+ logInfo(" - minicluster started")
+ try
+ TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ catch {
+ case e: Exception => logInfo(" - no table " + tableName + " found")
+
+ }
+ logInfo(" - creating table " + tableName)
+ TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
+ logInfo(" - created table")
+
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ TEST_UTIL.shutdownMiniCluster()
+ sc.stop()
+ }
+
+ test("bulkput to test HBase client") {
+ val config = TEST_UTIL.getConfiguration
+ val rdd1 = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3"))))))
+
+ val rdd2 = sc.parallelize(Array(
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
+
+ val hbaseContext = new HBaseContext(sc, config)
+ val ssc = new StreamingContext(sc, Milliseconds(200))
+
+ val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
+ Array[Byte], Array[Byte])])]]()
+ queue += rdd1
+ queue += rdd2
+ val dStream = ssc.queueStream(queue)
+
+ dStream.hbaseBulkPut(
+ hbaseContext,
+ TableName.valueOf(tableName),
+ (putRecord) => {
+ val put = new Put(putRecord._1)
+ putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
+ put
+ })
+
+ ssc.start()
+
+ ssc.awaitTerminationOrTimeout(1000)
+
+ val connection = ConnectionFactory.createConnection(config)
+ val table = connection.getTable(TableName.valueOf("t1"))
+
+ try {
+ val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
+ assert(foo1 == "foo1")
+
+ val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
+ assert(foo2 == "foo2")
+
+ val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
+ assert(foo3 == "foo3")
+
+ val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
+ assert(foo4 == "foo")
+
+ val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
+ getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
+ assert(foo5 == "bar")
+ } finally {
+ table.close()
+ connection.close()
+ }
+ }
+
+}
\ No newline at end of file