You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/07/28 18:47:05 UTC

[3/3] hbase git commit: HBASE-13992 Integrate SparkOnHBase into HBase

HBASE-13992 Integrate SparkOnHBase into HBase

Signed-off-by: Sean Busbey <bu...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30f7d127
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30f7d127
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30f7d127

Branch: refs/heads/master
Commit: 30f7d127c3974cff9e3058e13d7c50805ee4482f
Parents: 6b9b7cb
Author: Ted Malaska <te...@cloudera.com>
Authored: Tue Jul 28 11:10:37 2015 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Jul 28 11:45:23 2015 -0500

----------------------------------------------------------------------
 dev-support/test-patch.properties               |   3 +-
 hbase-spark/pom.xml                             | 572 +++++++++++++++++++
 .../JavaHBaseBulkDeleteExample.java             |  80 +++
 .../hbasecontext/JavaHBaseBulkGetExample.java   | 115 ++++
 .../hbasecontext/JavaHBaseBulkPutExample.java   |  90 +++
 .../hbasecontext/JavaHBaseDistributedScan.java  |  81 +++
 .../hbasecontext/JavaHBaseMapGetPutExample.java | 105 ++++
 .../JavaHBaseStreamingBulkPutExample.java       |  90 +++
 .../hadoop/hbase/spark/HBaseContext.scala       | 570 ++++++++++++++++++
 .../hbase/spark/HBaseDStreamFunctions.scala     | 158 +++++
 .../hadoop/hbase/spark/HBaseRDDFunctions.scala  | 162 ++++++
 .../hadoop/hbase/spark/JavaHBaseContext.scala   | 347 +++++++++++
 .../hbasecontext/HBaseBulkDeleteExample.scala   |  63 ++
 .../hbasecontext/HBaseBulkGetExample.scala      |  93 +++
 .../hbasecontext/HBaseBulkPutExample.scala      |  75 +++
 .../HBaseBulkPutExampleFromFile.scala           |  76 +++
 .../HBaseBulkPutTimestampExample.scala          |  77 +++
 .../HBaseDistributedScanExample.scala           |  61 ++
 .../HBaseStreamingBulkPutExample.scala          |  74 +++
 .../example/rdd/HBaseBulkDeleteExample.scala    |  64 +++
 .../spark/example/rdd/HBaseBulkGetExample.scala |  88 +++
 .../spark/example/rdd/HBaseBulkPutExample.scala |  76 +++
 .../rdd/HBaseForeachPartitionExample.scala      |  83 +++
 .../example/rdd/HBaseMapPartitionExample.scala  |  89 +++
 .../hbase/spark/JavaHBaseContextSuite.java      | 334 +++++++++++
 .../hadoop/hbase/spark/HBaseContextSuite.scala  | 344 +++++++++++
 .../spark/HBaseDStreamFunctionsSuite.scala      | 129 +++++
 .../hbase/spark/HBaseRDDFunctionsSuite.scala    | 398 +++++++++++++
 pom.xml                                         |   1 +
 29 files changed, 4497 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/dev-support/test-patch.properties
----------------------------------------------------------------------
diff --git a/dev-support/test-patch.properties b/dev-support/test-patch.properties
index c652e3f..7e75965 100644
--- a/dev-support/test-patch.properties
+++ b/dev-support/test-patch.properties
@@ -21,7 +21,8 @@ MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}"
 OK_RELEASEAUDIT_WARNINGS=0
 # Allow four warnings.  Javadoc complains about sun.misc.Unsafe use.
 # See HBASE-7457, HBASE-13761
-OK_JAVADOC_WARNINGS=7
+# Allow 2 additional warnings for Scala stub notice about MR. See HBASE-13992
+OK_JAVADOC_WARNINGS=9
 
 MAX_LINE_LENGTH=100
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml
new file mode 100644
index 0000000..e48f9e8
--- /dev/null
+++ b/hbase-spark/pom.xml
@@ -0,0 +1,572 @@
+<?xml version="1.0"?>
+
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation=
+                 "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>hbase</artifactId>
+        <groupId>org.apache.hbase</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <artifactId>hbase-spark</artifactId>
+    <name>Apache HBase - Spark</name>
+
+    <properties>
+        <spark.version>1.3.0</spark.version>
+        <scala.version>2.10.4</scala.version>
+        <scala.binary.version>2.10</scala.binary.version>
+        <top.dir>${project.basedir}/..</top.dir>
+    </properties>
+
+    <dependencies>
+        <!-- Force import of Spark's servlet API for unit tests -->
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+            <version>3.0.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Mark Spark / Scala as provided -->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <!-- make sure wrong scala version is not pulled in -->
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <!-- make sure wrong scala version is not pulled in -->
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scalap</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <type>test-jar</type>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>2.2.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalamock</groupId>
+            <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
+            <version>3.1.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop-two.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop-two.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop-two.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop-two.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>thrift</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jsp-2.1</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jsp-api-2.1</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api-2.5</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>tomcat</groupId>
+                    <artifactId>jasper-runtime</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>tomcat</groupId>
+                    <artifactId>jasper-compiler</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-hadoop-compat</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>thrift</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jsp-2.1</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jsp-api-2.1</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api-2.5</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>tomcat</groupId>
+                    <artifactId>jasper-runtime</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>tomcat</groupId>
+                    <artifactId>jasper-compiler</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-hadoop2-compat</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>thrift</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jsp-2.1</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jsp-api-2.1</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api-2.5</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>tomcat</groupId>
+                    <artifactId>jasper-runtime</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>tomcat</groupId>
+                    <artifactId>jasper-compiler</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jruby</groupId>
+                    <artifactId>jruby-complete</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-it</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <testSourceDirectory>src/test/scala</testSourceDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.0</version>
+                <configuration>
+                    <charset>${project.build.sourceEncoding}</charset>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <version>1.0</version>
+                <configuration>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>WDF TestSuite.txt</filereports>
+                    <parallel>false</parallel>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <skipTests>true</skipTests>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>integration-test</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <tagsToExclude>Integration-Test</tagsToExclude>
+                            <argLine>
+                                -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+                            </argLine>
+                            <parallel>false</parallel>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
new file mode 100644
index 0000000..68b2edd
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is a simple example of deleting records in HBase
+ * with the bulkDelete function.
+ */
+final public class JavaHBaseBulkDeleteExample {
+
+  private JavaHBaseBulkDeleteExample() {}
+
+  public static void main(String[] args) {
+    if (args.length < 1) {
+      System.out.println("JavaHBaseBulkDeleteExample  {tableName}");
+      return;
+    }
+
+    String tableName = args[0];
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      List<byte[]> list = new ArrayList<>();
+      list.add(Bytes.toBytes("1"));
+      list.add(Bytes.toBytes("2"));
+      list.add(Bytes.toBytes("3"));
+      list.add(Bytes.toBytes("4"));
+      list.add(Bytes.toBytes("5"));
+
+      JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+      Configuration conf = HBaseConfiguration.create();
+
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+      hbaseContext.bulkDelete(rdd,
+              TableName.valueOf(tableName), new DeleteFunction(), 4);
+    } finally {
+      jsc.stop();
+    }
+
+  }
+
+  public static class DeleteFunction implements Function<byte[], Delete> {
+    private static final long serialVersionUID = 1L;
+    public Delete call(byte[] v) throws Exception {
+      return new Delete(v);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
new file mode 100644
index 0000000..c7dcbb6
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * This is a simple example of getting records in HBase
+ * with the bulkGet function.
+ */
+final public class JavaHBaseBulkGetExample {
+
+  private JavaHBaseBulkGetExample() {}
+
+  public static void main(String[] args) {
+    if (args.length < 1) {
+      System.out.println("JavaHBaseBulkGetExample  {tableName}");
+      return;
+    }
+
+    String tableName = args[0];
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      List<byte[]> list = new ArrayList<>();
+      list.add(Bytes.toBytes("1"));
+      list.add(Bytes.toBytes("2"));
+      list.add(Bytes.toBytes("3"));
+      list.add(Bytes.toBytes("4"));
+      list.add(Bytes.toBytes("5"));
+
+      JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+      Configuration conf = HBaseConfiguration.create();
+
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+      hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(),
+              new ResultFunction());
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public static class GetFunction implements Function<byte[], Get> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Get call(byte[] v) throws Exception {
+      return new Get(v);
+    }
+  }
+
+  public static class ResultFunction implements Function<Result, String> {
+
+    private static final long serialVersionUID = 1L;
+
+    public String call(Result result) throws Exception {
+      Iterator<Cell> it = result.listCells().iterator();
+      StringBuilder b = new StringBuilder();
+
+      b.append(Bytes.toString(result.getRow())).append(":");
+
+      while (it.hasNext()) {
+        Cell cell = it.next();
+        String q = Bytes.toString(cell.getQualifierArray());
+        if (q.equals("counter")) {
+          b.append("(")
+                  .append(Bytes.toString(cell.getQualifierArray()))
+                  .append(",")
+                  .append(Bytes.toLong(cell.getValueArray()))
+                  .append(")");
+        } else {
+          b.append("(")
+                  .append(Bytes.toString(cell.getQualifierArray()))
+                  .append(",")
+                  .append(Bytes.toString(cell.getValueArray()))
+                  .append(")");
+        }
+      }
+      return b.toString();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
new file mode 100644
index 0000000..ded5081
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.
+ */
+final public class JavaHBaseBulkPutExample {
+
+  private JavaHBaseBulkPutExample() {}
+
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      System.out.println("JavaHBaseBulkPutExample  " +
+              "{tableName} {columnFamily}");
+      return;
+    }
+
+    String tableName = args[0];
+    String columnFamily = args[1];
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      List<String> list = new ArrayList<>();
+      list.add("1," + columnFamily + ",a,1");
+      list.add("2," + columnFamily + ",a,2");
+      list.add("3," + columnFamily + ",a,3");
+      list.add("4," + columnFamily + ",a,4");
+      list.add("5," + columnFamily + ",a,5");
+
+      JavaRDD<String> rdd = jsc.parallelize(list);
+
+      Configuration conf = HBaseConfiguration.create();
+
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+      hbaseContext.bulkPut(rdd,
+              TableName.valueOf(tableName),
+              new PutFunction());
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public static class PutFunction implements Function<String, Put> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Put call(String v) throws Exception {
+      String[] cells = v.split(",");
+      Put put = new Put(Bytes.toBytes(cells[0]));
+
+      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
+              Bytes.toBytes(cells[3]));
+      return put;
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
new file mode 100644
index 0000000..6192ad9
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.spark.api.java.function.Function;
+import scala.Tuple2;
+
+/**
+ * This is a simple example of scanning records from HBase
+ * with the hbaseRDD function.
+ */
+final public class JavaHBaseDistributedScan {
+
+  private JavaHBaseDistributedScan() {}
+
+  public static void main(String[] args) {
+    if (args.length < 1) {
+      System.out.println("JavaHBaseDistributedScan {tableName}");
+      return;
+    }
+
+    String tableName = args[0];
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      Configuration conf = HBaseConfiguration.create();
+
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+      Scan scan = new Scan();
+      scan.setCaching(100);
+
+      JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
+              hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
+
+      List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
+
+      System.out.println("Result Size: " + results.size());
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  private static class ScanConvertFunction implements
+          Function<Tuple2<ImmutableBytesWritable, Result>, String> {
+    @Override
+    public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
+      return Bytes.toString(v1._1().copyBytes());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
new file mode 100644
index 0000000..0d41a70
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+
+import scala.Tuple2;
+
+/**
+ * This is a simple example of using the foreachPartition
+ * method with a HBase connection
+ */
+final public class JavaHBaseMapGetPutExample {
+
+  private JavaHBaseMapGetPutExample() {}
+
+  public static void main(String[] args) {
+    if (args.length < 1) {
+      System.out.println("JavaHBaseBulkGetExample {tableName}");
+      return;
+    }
+
+    final String tableName = args[0];
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      List<byte[]> list = new ArrayList<>();
+      list.add(Bytes.toBytes("1"));
+      list.add(Bytes.toBytes("2"));
+      list.add(Bytes.toBytes("3"));
+      list.add(Bytes.toBytes("4"));
+      list.add(Bytes.toBytes("5"));
+
+      JavaRDD<byte[]> rdd = jsc.parallelize(list);
+      Configuration conf = HBaseConfiguration.create();
+
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+      hbaseContext.foreachPartition(rdd,
+              new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
+        public void call(Tuple2<Iterator<byte[]>, Connection> t)
+                throws Exception {
+          Table table = t._2().getTable(TableName.valueOf(tableName));
+          BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
+
+          while (t._1().hasNext()) {
+            byte[] b = t._1().next();
+            Result r = table.get(new Get(b));
+            if (r.getExists()) {
+              mutator.mutate(new Put(b));
+            }
+          }
+
+          mutator.flush();
+          mutator.close();
+          table.close();
+        }
+      });
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public static class GetFunction implements Function<byte[], Get> {
+    private static final long serialVersionUID = 1L;
+    public Get call(byte[] v) throws Exception {
+      return new Get(v);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
new file mode 100644
index 0000000..cd4cf24
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * This is a simple example of BulkPut with Spark Streaming
+ */
+final public class JavaHBaseStreamingBulkPutExample {
+
+  private JavaHBaseStreamingBulkPutExample() {}
+
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      System.out.println("JavaHBaseBulkPutExample  " +
+              "{host} {port} {tableName}");
+      return;
+    }
+
+    String host = args[0];
+    String port = args[1];
+    String tableName = args[2];
+
+    SparkConf sparkConf =
+            new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
+                    tableName + ":" + port + ":" + tableName);
+
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      JavaStreamingContext jssc =
+              new JavaStreamingContext(jsc, new Duration(1000));
+
+      JavaReceiverInputDStream<String> javaDstream =
+              jssc.socketTextStream(host, Integer.parseInt(port));
+
+      Configuration conf = HBaseConfiguration.create();
+
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+      hbaseContext.streamBulkPut(javaDstream,
+              TableName.valueOf(tableName),
+              new PutFunction());
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public static class PutFunction implements Function<String, Put> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Put call(String v) throws Exception {
+      String[] part = v.split(",");
+      Put put = new Put(Bytes.toBytes(part[0]));
+
+      put.addColumn(Bytes.toBytes(part[1]),
+              Bytes.toBytes(part[2]),
+              Bytes.toBytes(part[3]));
+      return put;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
new file mode 100644
index 0000000..f060fea
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.client.Result
+import scala.reflect.ClassTag
+import org.apache.hadoop.hbase.client.Connection
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.spark.{Logging, SerializableWritable, SparkContext}
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.hbase.client.Mutation
+import org.apache.spark.streaming.dstream.DStream
+import java.io._
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat
+import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+  * HBaseContext is a façade for HBase operations
+  * like bulk put, get, increment, delete, and scan
+  *
+  * HBaseContext will take the responsibilities
+  * of disseminating the configuration information
+  * to the working and managing the life cycle of HConnections.
+ */
+class HBaseContext(@transient sc: SparkContext,
+                   @transient config: Configuration,
+                   val tmpHdfsConfgFile: String = null)
+  extends Serializable with Logging {
+
+  @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+  @transient var tmpHdfsConfiguration:Configuration = config
+  @transient var appliedCredentials = false
+  @transient val job = Job.getInstance(config)
+  TableMapReduceUtil.initCredentials(job)
+  val broadcastedConf = sc.broadcast(new SerializableWritable(config))
+  val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
+
+  if (tmpHdfsConfgFile != null && config != null) {
+    val fs = FileSystem.newInstance(config)
+    val tmpPath = new Path(tmpHdfsConfgFile)
+    if (!fs.exists(tmpPath)) {
+      val outputStream = fs.create(tmpPath)
+      config.write(outputStream)
+      outputStream.close()
+    } else {
+      logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
+    }
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark RDD foreachPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param rdd  Original RDD with data to iterate over
+   * @param f    Function to be given a iterator to iterate through
+   *             the RDD values and a HConnection object to interact
+   *             with HBase
+   */
+  def foreachPartition[T](rdd: RDD[T],
+                          f: (Iterator[T], Connection) => Unit):Unit = {
+    rdd.foreachPartition(
+      it => hbaseForeachPartition(broadcastedConf, it, f))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming dStream foreach
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f        Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   */
+  def foreachPartition[T](dstream: DStream[T],
+                    f: (Iterator[T], Connection) => Unit):Unit = {
+    dstream.foreachRDD((rdd, time) => {
+      foreachPartition(rdd, f)
+    })
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark RDD mapPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param rdd  Original RDD with data to iterate over
+   * @param mp   Function to be given a iterator to iterate through
+   *             the RDD values and a HConnection object to interact
+   *             with HBase
+   * @return     Returns a new RDD generated by the user definition
+   *             function just like normal mapPartition
+   */
+  def mapPartitions[T, R: ClassTag](rdd: RDD[T],
+                                   mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
+
+    rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
+      it,
+      mp))
+
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming DStream
+   * foreachPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   *       getting data from HBase
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   * @return         Returns a new DStream generated by the user
+   *                 definition function just like normal mapPartition
+   */
+  def streamForeachPartition[T](dstream: DStream[T],
+                                f: (Iterator[T], Connection) => Unit): Unit = {
+
+    dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming DStream
+   * mapPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   *       getting data from HBase
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   * @return         Returns a new DStream generated by the user
+   *                 definition function just like normal mapPartition
+   */
+  def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
+                                f: (Iterator[T], Connection) => Iterator[U]):
+  DStream[U] = {
+    dstream.mapPartitions(it => hbaseMapPartition[T, U](
+      broadcastedConf,
+      it,
+      f))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take RDD
+   * and generate puts and send them to HBase.
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param rdd       Original RDD with data to iterate over
+   * @param tableName The name of the table to put into
+   * @param f         Function to convert a value in the RDD to a HBase Put
+   */
+  def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
+
+    val tName = tableName.getName
+    rdd.foreachPartition(
+      it => hbaseForeachPartition[T](
+        broadcastedConf,
+        it,
+        (iterator, connection) => {
+          val m = connection.getBufferedMutator(TableName.valueOf(tName))
+          iterator.foreach(T => m.mutate(f(T)))
+          m.flush()
+          m.close()
+        }))
+  }
+
+  def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){
+    credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+
+    logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
+
+    if (!appliedCredentials && credentials != null) {
+      appliedCredentials = true
+
+      @transient val ugi = UserGroupInformation.getCurrentUser
+      ugi.addCredentials(credentials)
+      // specify that this is a proxy user
+      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
+
+      ugi.addCredentials(credentialsConf.value.value)
+    }
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMapPartition method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generate puts and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param dstream    Original DStream with data to iterate over
+   * @param tableName  The name of the table to put into
+   * @param f          Function to convert a value in
+   *                   the DStream to a HBase Put
+   */
+  def streamBulkPut[T](dstream: DStream[T],
+                       tableName: TableName,
+                       f: (T) => Put) = {
+    val tName = tableName.getName
+    dstream.foreachRDD((rdd, time) => {
+      bulkPut(rdd, TableName.valueOf(tName), f)
+    })
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take a RDD and generate delete
+   * and send them to HBase.  The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param rdd       Original RDD with data to iterate over
+   * @param tableName The name of the table to delete from
+   * @param f         Function to convert a value in the RDD to a
+   *                  HBase Deletes
+   * @param batchSize       The number of delete to batch before sending to HBase
+   */
+  def bulkDelete[T](rdd: RDD[T], tableName: TableName,
+                    f: (T) => Delete, batchSize: Integer) {
+    bulkMutation(rdd, tableName, f, batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamBulkMutation method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generate Delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param dstream    Original DStream with data to iterate over
+   * @param tableName  The name of the table to delete from
+   * @param f          function to convert a value in the DStream to a
+   *                   HBase Delete
+   * @param batchSize        The number of deletes to batch before sending to HBase
+   */
+  def streamBulkDelete[T](dstream: DStream[T],
+                          tableName: TableName,
+                          f: (T) => Delete,
+                          batchSize: Integer) = {
+    streamBulkMutation(dstream, tableName, f, batchSize)
+  }
+
+  /**
+   *  Under lining function to support all bulk mutations
+   *
+   *  May be opened up if requested
+   */
+  private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
+                              f: (T) => Mutation, batchSize: Integer) {
+
+    val tName = tableName.getName
+    rdd.foreachPartition(
+      it => hbaseForeachPartition[T](
+        broadcastedConf,
+        it,
+        (iterator, connection) => {
+          val table = connection.getTable(TableName.valueOf(tName))
+          val mutationList = new java.util.ArrayList[Mutation]
+          iterator.foreach(T => {
+            mutationList.add(f(T))
+            if (mutationList.size >= batchSize) {
+              table.batch(mutationList, null)
+              mutationList.clear()
+            }
+          })
+          if (mutationList.size() > 0) {
+            table.batch(mutationList, null)
+            mutationList.clear()
+          }
+          table.close()
+        }))
+  }
+
+  /**
+   *  Under lining function to support all bulk streaming mutations
+   *
+   *  May be opened up if requested
+   */
+  private def streamBulkMutation[T](dstream: DStream[T],
+                                    tableName: TableName,
+                                    f: (T) => Mutation,
+                                    batchSize: Integer) = {
+    val tName = tableName.getName
+    dstream.foreachRDD((rdd, time) => {
+      bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
+    })
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.mapPartition method.
+   *
+   * It allow addition support for a user to take a RDD and generates a
+   * new RDD based on Gets and the results they bring back from HBase
+   *
+   * @param rdd     Original RDD with data to iterate over
+   * @param tableName        The name of the table to get from
+   * @param makeGet    function to convert a value in the RDD to a
+   *                   HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                   what ever the user wants to put in the resulting
+   *                   RDD
+   * return            new RDD that is created by the Get to HBase
+   */
+  def bulkGet[T, U: ClassTag](tableName: TableName,
+                    batchSize: Integer,
+                    rdd: RDD[T],
+                    makeGet: (T) => Get,
+                    convertResult: (Result) => U): RDD[U] = {
+
+    val getMapPartition = new GetMapPartition(tableName,
+      batchSize,
+      makeGet,
+      convertResult)
+
+    rdd.mapPartitions[U](it =>
+      hbaseMapPartition[T, U](
+        broadcastedConf,
+        it,
+        getMapPartition.run))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMap method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generates a new DStream based on Gets and the results
+   * they bring back from HBase
+   *
+   * @param tableName     The name of the table to get from
+   * @param batchSize     The number of Gets to be sent in a single batch
+   * @param dStream       Original DStream with data to iterate over
+   * @param makeGet       Function to convert a value in the DStream to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      DStream
+   * @return              A new DStream that is created by the Get to HBase
+   */
+  def streamBulkGet[T, U: ClassTag](tableName: TableName,
+                                    batchSize: Integer,
+                                    dStream: DStream[T],
+                                    makeGet: (T) => Get,
+                                    convertResult: (Result) => U): DStream[U] = {
+
+    val getMapPartition = new GetMapPartition(tableName,
+      batchSize,
+      makeGet,
+      convertResult)
+
+    dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
+      broadcastedConf,
+      it,
+      getMapPartition.run))
+  }
+
+  /**
+   * This function will use the native HBase TableInputFormat with the
+   * given scan object to generate a new RDD
+   *
+   *  @param tableName the name of the table to scan
+   *  @param scan      the HBase scan object to use to read data from HBase
+   *  @param f         function to convert a Result object from HBase into
+   *                   what the user wants in the final generated RDD
+   *  @return          new RDD with results from scan
+   */
+  def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
+                            f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
+
+    val job: Job = Job.getInstance(getConf(broadcastedConf))
+
+    TableMapReduceUtil.initCredentials(job)
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+      classOf[IdentityTableMapper], null, null, job)
+
+    sc.newAPIHadoopRDD(job.getConfiguration,
+      classOf[TableInputFormat],
+      classOf[ImmutableBytesWritable],
+      classOf[Result]).map(f)
+  }
+
+  /**
+   * A overloaded version of HBaseContext hbaseRDD that defines the
+   * type of the resulting RDD
+   *
+   *  @param tableName the name of the table to scan
+   *  @param scans     the HBase scan object to use to read data from HBase
+   *  @return          New RDD with results from scan
+   *
+   */
+  def hbaseRDD(tableName: TableName, scans: Scan):
+  RDD[(ImmutableBytesWritable, Result)] = {
+
+    hbaseRDD[(ImmutableBytesWritable, Result)](
+      tableName,
+      scans,
+      (r: (ImmutableBytesWritable, Result)) => r)
+  }
+
+  /**
+   *  underlining wrapper all foreach functions in HBaseContext
+   */
+  private def hbaseForeachPartition[T](configBroadcast:
+                                       Broadcast[SerializableWritable[Configuration]],
+                                        it: Iterator[T],
+                                        f: (Iterator[T], Connection) => Unit) = {
+
+    val config = getConf(configBroadcast)
+
+    applyCreds(configBroadcast)
+    // specify that this is a proxy user
+    val connection = ConnectionFactory.createConnection(config)
+    f(it, connection)
+    connection.close()
+  }
+
+  private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
+  Configuration = {
+
+    if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
+      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
+      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
+      tmpHdfsConfiguration = new Configuration(false)
+      tmpHdfsConfiguration.readFields(inputStream)
+      inputStream.close()
+    }
+
+    if (tmpHdfsConfiguration == null) {
+      try {
+        tmpHdfsConfiguration = configBroadcast.value.value
+      } catch {
+        case ex: Exception => logError("Unable to getConfig from broadcast", ex)
+      }
+    }
+    tmpHdfsConfiguration
+  }
+
+  /**
+   *  underlining wrapper all mapPartition functions in HBaseContext
+   *
+   */
+  private def hbaseMapPartition[K, U](
+                                       configBroadcast:
+                                       Broadcast[SerializableWritable[Configuration]],
+                                       it: Iterator[K],
+                                       mp: (Iterator[K], Connection) =>
+                                         Iterator[U]): Iterator[U] = {
+
+    val config = getConf(configBroadcast)
+    applyCreds(configBroadcast)
+
+    val connection = ConnectionFactory.createConnection(config)
+    val res = mp(it, connection)
+    connection.close()
+    res
+
+  }
+
+  /**
+   *  underlining wrapper all get mapPartition functions in HBaseContext
+   */
+  private class GetMapPartition[T, U](tableName: TableName,
+                                      batchSize: Integer,
+                                      makeGet: (T) => Get,
+                                      convertResult: (Result) => U)
+    extends Serializable {
+
+    val tName = tableName.getName
+
+    def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
+      val table = connection.getTable(TableName.valueOf(tName))
+
+      val gets = new java.util.ArrayList[Get]()
+      var res = List[U]()
+
+      while (iterator.hasNext) {
+        gets.add(makeGet(iterator.next()))
+
+        if (gets.size() == batchSize) {
+          val results = table.get(gets)
+          res = res ++ results.map(convertResult)
+          gets.clear()
+        }
+      }
+      if (gets.size() > 0) {
+        val results = table.get(gets)
+        res = res ++ results.map(convertResult)
+        gets.clear()
+      }
+      table.close()
+      res.iterator
+    }
+  }
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   *
+   * This method is used to keep ClassTags out of the external Java API, as
+   * the Java compiler cannot produce them automatically. While this
+   * ClassTag-faking does please the compiler, it can cause problems at runtime
+   * if the Scala API relies on ClassTags for correctness.
+   *
+   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+   * just worse performance or security issues.
+   * For instance, an Array of AnyRef can hold any type T, but may lose primitive
+   * specialization.
+   */
+  private[spark]
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
new file mode 100644
index 0000000..d563a29
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseDStreamFunctions contains a set of implicit functions that can be
+ * applied to a Spark DStream so that we can easily interact with HBase
+ */
+object HBaseDStreamFunctions {
+
+  /**
+   * These are implicit methods for a DStream that contains any type of
+   * data.
+   *
+   * @param dStream  This is for dStreams of any type
+   * @tparam T       Type T
+   */
+  implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * put.  This will not return a new Stream.  Think of it like a foreach
+     *
+     * @param hc         The hbaseContext object to identify which
+     *                   HBase cluster connection to use
+     * @param tableName  The tableName that the put will be sent to
+     * @param f          The function that will turn the DStream values
+     *                   into HBase Put objects.
+     */
+    def hbaseBulkPut(hc: HBaseContext,
+                     tableName: TableName,
+                     f: (T) => Put): Unit = {
+      hc.streamBulkPut(dStream, tableName, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new DStream.  Think about it as a DStream map
+     * function.  In that every DStream value will get a new value out of
+     * HBase.  That new value will populate the newly generated DStream.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @param convertResult  The function that will convert a HBase
+     *                       Result object into a value that will go
+     *                       into the resulting DStream
+     * @tparam R             The type of Object that will be coming
+     *                       out of the resulting DStream
+     * @return               A resulting DStream with type R objects
+     */
+    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+                     tableName: TableName,
+                     batchSize:Int, f: (T) => Get, convertResult: (Result) => R):
+    DStream[R] = {
+      hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new DStream.  Think about it as a DStream map
+     * function.  In that every DStream value will get a new value out of
+     * HBase.  That new value will populate the newly generated DStream.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @return               A resulting DStream with type R objects
+     */
+    def hbaseBulkGet(hc: HBaseContext,
+                     tableName: TableName, batchSize:Int,
+                     f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = {
+        hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
+          tableName, batchSize, dStream, f,
+          result => (new ImmutableBytesWritable(result.getRow), result))
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * Delete.  This will not return a new DStream.
+     *
+     * @param hc         The hbaseContext object to identify which HBase
+     *                   cluster connection to use
+     * @param tableName  The tableName that the deletes will be sent to
+     * @param f          The function that will convert the DStream value into
+     *                   a HBase Delete Object
+     * @param batchSize  The number of Deletes to be sent in a single batch
+     */
+    def hbaseBulkDelete(hc: HBaseContext,
+                        tableName: TableName,
+                        f:(T) => Delete, batchSize:Int): Unit = {
+      hc.streamBulkDelete(dStream, tableName, f, batchSize)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * foreachPartition method.  This will ack very much like a normal DStream
+     * foreach method but for the fact that you will now have a HBase connection
+     * while iterating through the values.
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            DStream along with a connection object to HBase
+     */
+    def hbaseForeachPartition(hc: HBaseContext,
+                              f: (Iterator[T], Connection) => Unit): Unit = {
+      hc.streamForeachPartition(dStream, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * mapPartitions method.  This will ask very much like a normal DStream
+     * map partitions method but for the fact that you will now have a
+     * HBase connection while iterating through the values
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            DStream along with a connection object to HBase
+     * @tparam R  This is the type of objects that will go into the resulting
+     *            DStream
+     * @return    A resulting DStream of type R
+     */
+    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+                                        f: (Iterator[T], Connection) => Iterator[R]):
+    DStream[R] = {
+      hc.streamMapPartitions(dStream, f)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
new file mode 100644
index 0000000..fb8456d
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseRDDFunctions contains a set of implicit functions that can be
+ * applied to a Spark RDD so that we can easily interact with HBase
+ */
+object HBaseRDDFunctions
+{
+
+  /**
+   * These are implicit methods for a RDD that contains any type of
+   * data.
+   *
+   * @param rdd This is for rdd of any type
+   * @tparam T  This is any type
+   */
+  implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * put.  This will not return a new RDD.  Think of it like a foreach
+     *
+     * @param hc         The hbaseContext object to identify which
+     *                   HBase cluster connection to use
+     * @param tableName  The tableName that the put will be sent to
+     * @param f          The function that will turn the RDD values
+     *                   into HBase Put objects.
+     */
+    def hbaseBulkPut(hc: HBaseContext,
+                     tableName: TableName,
+                     f: (T) => Put): Unit = {
+      hc.bulkPut(rdd, tableName, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new RDD.  Think about it as a RDD map
+     * function.  In that every RDD value will get a new value out of
+     * HBase.  That new value will populate the newly generated RDD.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @param convertResult  The function that will convert a HBase
+     *                       Result object into a value that will go
+     *                       into the resulting RDD
+     * @tparam R             The type of Object that will be coming
+     *                       out of the resulting RDD
+     * @return               A resulting RDD with type R objects
+     */
+    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+                            tableName: TableName, batchSize:Int,
+                            f: (T) => Get, convertResult: (Result) => R): RDD[R] = {
+      hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new RDD.  Think about it as a RDD map
+     * function.  In that every RDD value will get a new value out of
+     * HBase.  That new value will populate the newly generated RDD.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @return               A resulting RDD with type R objects
+     */
+    def hbaseBulkGet(hc: HBaseContext,
+                                  tableName: TableName, batchSize:Int,
+                                  f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = {
+      hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
+        batchSize, rdd, f,
+        result => if (result != null && result.getRow != null) {
+          (new ImmutableBytesWritable(result.getRow), result)
+        } else {
+          null
+        })
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * Delete.  This will not return a new RDD.
+     *
+     * @param hc         The hbaseContext object to identify which HBase
+     *                   cluster connection to use
+     * @param tableName  The tableName that the deletes will be sent to
+     * @param f          The function that will convert the RDD value into
+     *                   a HBase Delete Object
+     * @param batchSize  The number of Deletes to be sent in a single batch
+     */
+    def hbaseBulkDelete(hc: HBaseContext,
+                        tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = {
+      hc.bulkDelete(rdd, tableName, f, batchSize)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * foreachPartition method.  This will ack very much like a normal RDD
+     * foreach method but for the fact that you will now have a HBase connection
+     * while iterating through the values.
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            RDD along with a connection object to HBase
+     */
+    def hbaseForeachPartition(hc: HBaseContext,
+                              f: (Iterator[T], Connection) => Unit): Unit = {
+      hc.foreachPartition(rdd, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * mapPartitions method.  This will ask very much like a normal RDD
+     * map partitions method but for the fact that you will now have a
+     * HBase connection while iterating through the values
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            RDD along with a connection object to HBase
+     * @tparam R  This is the type of objects that will go into the resulting
+     *            RDD
+     * @return    A resulting RDD of type R
+     */
+    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+                                        f: (Iterator[T], Connection) => Iterator[R]):
+    RDD[R] = {
+      hc.mapPartitions[T,R](rdd, f)
+    }
+  }
+}