You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/07/28 18:47:05 UTC
[3/3] hbase git commit: HBASE-13992 Integrate SparkOnHBase into HBase
HBASE-13992 Integrate SparkOnHBase into HBase
Signed-off-by: Sean Busbey <bu...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30f7d127
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30f7d127
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30f7d127
Branch: refs/heads/master
Commit: 30f7d127c3974cff9e3058e13d7c50805ee4482f
Parents: 6b9b7cb
Author: Ted Malaska <te...@cloudera.com>
Authored: Tue Jul 28 11:10:37 2015 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Jul 28 11:45:23 2015 -0500
----------------------------------------------------------------------
dev-support/test-patch.properties | 3 +-
hbase-spark/pom.xml | 572 +++++++++++++++++++
.../JavaHBaseBulkDeleteExample.java | 80 +++
.../hbasecontext/JavaHBaseBulkGetExample.java | 115 ++++
.../hbasecontext/JavaHBaseBulkPutExample.java | 90 +++
.../hbasecontext/JavaHBaseDistributedScan.java | 81 +++
.../hbasecontext/JavaHBaseMapGetPutExample.java | 105 ++++
.../JavaHBaseStreamingBulkPutExample.java | 90 +++
.../hadoop/hbase/spark/HBaseContext.scala | 570 ++++++++++++++++++
.../hbase/spark/HBaseDStreamFunctions.scala | 158 +++++
.../hadoop/hbase/spark/HBaseRDDFunctions.scala | 162 ++++++
.../hadoop/hbase/spark/JavaHBaseContext.scala | 347 +++++++++++
.../hbasecontext/HBaseBulkDeleteExample.scala | 63 ++
.../hbasecontext/HBaseBulkGetExample.scala | 93 +++
.../hbasecontext/HBaseBulkPutExample.scala | 75 +++
.../HBaseBulkPutExampleFromFile.scala | 76 +++
.../HBaseBulkPutTimestampExample.scala | 77 +++
.../HBaseDistributedScanExample.scala | 61 ++
.../HBaseStreamingBulkPutExample.scala | 74 +++
.../example/rdd/HBaseBulkDeleteExample.scala | 64 +++
.../spark/example/rdd/HBaseBulkGetExample.scala | 88 +++
.../spark/example/rdd/HBaseBulkPutExample.scala | 76 +++
.../rdd/HBaseForeachPartitionExample.scala | 83 +++
.../example/rdd/HBaseMapPartitionExample.scala | 89 +++
.../hbase/spark/JavaHBaseContextSuite.java | 334 +++++++++++
.../hadoop/hbase/spark/HBaseContextSuite.scala | 344 +++++++++++
.../spark/HBaseDStreamFunctionsSuite.scala | 129 +++++
.../hbase/spark/HBaseRDDFunctionsSuite.scala | 398 +++++++++++++
pom.xml | 1 +
29 files changed, 4497 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/dev-support/test-patch.properties
----------------------------------------------------------------------
diff --git a/dev-support/test-patch.properties b/dev-support/test-patch.properties
index c652e3f..7e75965 100644
--- a/dev-support/test-patch.properties
+++ b/dev-support/test-patch.properties
@@ -21,7 +21,8 @@ MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}"
OK_RELEASEAUDIT_WARNINGS=0
# Allow four warnings. Javadoc complains about sun.misc.Unsafe use.
# See HBASE-7457, HBASE-13761
-OK_JAVADOC_WARNINGS=7
+# Allow 2 additional warnings for Scala stub notice about MR. See HBASE-13992
+OK_JAVADOC_WARNINGS=9
MAX_LINE_LENGTH=100
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml
new file mode 100644
index 0000000..e48f9e8
--- /dev/null
+++ b/hbase-spark/pom.xml
@@ -0,0 +1,572 @@
+<?xml version="1.0"?>
+
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation=
+ "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>hbase</artifactId>
+ <groupId>org.apache.hbase</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <artifactId>hbase-spark</artifactId>
+ <name>Apache HBase - Spark</name>
+
+ <properties>
+ <spark.version>1.3.0</spark.version>
+ <scala.version>2.10.4</scala.version>
+ <scala.binary.version>2.10</scala.binary.version>
+ <top.dir>${project.basedir}/..</top.dir>
+ </properties>
+
+ <dependencies>
+ <!-- Force import of Spark's servlet API for unit tests -->
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.0.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Mark Spark / Scala as provided -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <!-- make sure wrong scala version is not pulled in -->
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <!-- make sure wrong scala version is not pulled in -->
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>2.2.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalamock</groupId>
+ <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
+ <version>3.1.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop-two.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop-two.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop-two.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop-two.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>thrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>thrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>thrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-it</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <configuration>
+ <charset>${project.build.sourceEncoding}</charset>
+ <scalaVersion>${scala.version}</scalaVersion>
+ </configuration>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>WDF TestSuite.txt</filereports>
+ <parallel>false</parallel>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-test</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <tagsToExclude>Integration-Test</tagsToExclude>
+ <argLine>
+ -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <parallel>false</parallel>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
new file mode 100644
index 0000000..68b2edd
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is a simple example of deleting records in HBase
+ * with the bulkDelete function.
+ */
+final public class JavaHBaseBulkDeleteExample {
+
+ private JavaHBaseBulkDeleteExample() {}
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.out.println("JavaHBaseBulkDeleteExample {tableName}");
+ return;
+ }
+
+ String tableName = args[0];
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+ list.add(Bytes.toBytes("4"));
+ list.add(Bytes.toBytes("5"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+ Configuration conf = HBaseConfiguration.create();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.bulkDelete(rdd,
+ TableName.valueOf(tableName), new DeleteFunction(), 4);
+ } finally {
+ jsc.stop();
+ }
+
+ }
+
+ public static class DeleteFunction implements Function<byte[], Delete> {
+ private static final long serialVersionUID = 1L;
+ public Delete call(byte[] v) throws Exception {
+ return new Delete(v);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
new file mode 100644
index 0000000..c7dcbb6
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * This is a simple example of getting records in HBase
+ * with the bulkGet function.
+ */
+final public class JavaHBaseBulkGetExample {
+
+ private JavaHBaseBulkGetExample() {}
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.out.println("JavaHBaseBulkGetExample {tableName}");
+ return;
+ }
+
+ String tableName = args[0];
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+ list.add(Bytes.toBytes("4"));
+ list.add(Bytes.toBytes("5"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+ Configuration conf = HBaseConfiguration.create();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(),
+ new ResultFunction());
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public static class GetFunction implements Function<byte[], Get> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Get call(byte[] v) throws Exception {
+ return new Get(v);
+ }
+ }
+
+ public static class ResultFunction implements Function<Result, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public String call(Result result) throws Exception {
+ Iterator<Cell> it = result.listCells().iterator();
+ StringBuilder b = new StringBuilder();
+
+ b.append(Bytes.toString(result.getRow())).append(":");
+
+ while (it.hasNext()) {
+ Cell cell = it.next();
+ String q = Bytes.toString(cell.getQualifierArray());
+ if (q.equals("counter")) {
+ b.append("(")
+ .append(Bytes.toString(cell.getQualifierArray()))
+ .append(",")
+ .append(Bytes.toLong(cell.getValueArray()))
+ .append(")");
+ } else {
+ b.append("(")
+ .append(Bytes.toString(cell.getQualifierArray()))
+ .append(",")
+ .append(Bytes.toString(cell.getValueArray()))
+ .append(")");
+ }
+ }
+ return b.toString();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
new file mode 100644
index 0000000..ded5081
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * This is a simple example of putting records in HBase
+ * with the bulkPut function.
+ */
+final public class JavaHBaseBulkPutExample {
+
+ private JavaHBaseBulkPutExample() {}
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.out.println("JavaHBaseBulkPutExample " +
+ "{tableName} {columnFamily}");
+ return;
+ }
+
+ String tableName = args[0];
+ String columnFamily = args[1];
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ List<String> list = new ArrayList<>();
+ list.add("1," + columnFamily + ",a,1");
+ list.add("2," + columnFamily + ",a,2");
+ list.add("3," + columnFamily + ",a,3");
+ list.add("4," + columnFamily + ",a,4");
+ list.add("5," + columnFamily + ",a,5");
+
+ JavaRDD<String> rdd = jsc.parallelize(list);
+
+ Configuration conf = HBaseConfiguration.create();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.bulkPut(rdd,
+ TableName.valueOf(tableName),
+ new PutFunction());
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public static class PutFunction implements Function<String, Put> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Put call(String v) throws Exception {
+ String[] cells = v.split(",");
+ Put put = new Put(Bytes.toBytes(cells[0]));
+
+ put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
+ Bytes.toBytes(cells[3]));
+ return put;
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
new file mode 100644
index 0000000..6192ad9
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.spark.api.java.function.Function;
+import scala.Tuple2;
+
+/**
+ * This is a simple example of scanning records from HBase
+ * with the hbaseRDD function.
+ */
+final public class JavaHBaseDistributedScan {
+
+ private JavaHBaseDistributedScan() {}
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.out.println("JavaHBaseDistributedScan {tableName}");
+ return;
+ }
+
+ String tableName = args[0];
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ Configuration conf = HBaseConfiguration.create();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ Scan scan = new Scan();
+ scan.setCaching(100);
+
+ JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
+ hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
+
+ List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
+
+ System.out.println("Result Size: " + results.size());
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ private static class ScanConvertFunction implements
+ Function<Tuple2<ImmutableBytesWritable, Result>, String> {
+ @Override
+ public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
+ return Bytes.toString(v1._1().copyBytes());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
new file mode 100644
index 0000000..0d41a70
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+
+import scala.Tuple2;
+
+/**
+ * This is a simple example of using the foreachPartition
+ * method with a HBase connection
+ */
+final public class JavaHBaseMapGetPutExample {
+
+ private JavaHBaseMapGetPutExample() {}
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.out.println("JavaHBaseBulkGetExample {tableName}");
+ return;
+ }
+
+ final String tableName = args[0];
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+ list.add(Bytes.toBytes("4"));
+ list.add(Bytes.toBytes("5"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+ Configuration conf = HBaseConfiguration.create();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.foreachPartition(rdd,
+ new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
+ public void call(Tuple2<Iterator<byte[]>, Connection> t)
+ throws Exception {
+ Table table = t._2().getTable(TableName.valueOf(tableName));
+ BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
+
+ while (t._1().hasNext()) {
+ byte[] b = t._1().next();
+ Result r = table.get(new Get(b));
+ if (r.getExists()) {
+ mutator.mutate(new Put(b));
+ }
+ }
+
+ mutator.flush();
+ mutator.close();
+ table.close();
+ }
+ });
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public static class GetFunction implements Function<byte[], Get> {
+ private static final long serialVersionUID = 1L;
+ public Get call(byte[] v) throws Exception {
+ return new Get(v);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
new file mode 100644
index 0000000..cd4cf24
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * This is a simple example of BulkPut with Spark Streaming
+ */
+final public class JavaHBaseStreamingBulkPutExample {
+
+ private JavaHBaseStreamingBulkPutExample() {}
+
+ public static void main(String[] args) {
+ if (args.length < 4) {
+ System.out.println("JavaHBaseBulkPutExample " +
+ "{host} {port} {tableName}");
+ return;
+ }
+
+ String host = args[0];
+ String port = args[1];
+ String tableName = args[2];
+
+ SparkConf sparkConf =
+ new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
+ tableName + ":" + port + ":" + tableName);
+
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ JavaStreamingContext jssc =
+ new JavaStreamingContext(jsc, new Duration(1000));
+
+ JavaReceiverInputDStream<String> javaDstream =
+ jssc.socketTextStream(host, Integer.parseInt(port));
+
+ Configuration conf = HBaseConfiguration.create();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.streamBulkPut(javaDstream,
+ TableName.valueOf(tableName),
+ new PutFunction());
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public static class PutFunction implements Function<String, Put> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Put call(String v) throws Exception {
+ String[] part = v.split(",");
+ Put put = new Put(Bytes.toBytes(part[0]));
+
+ put.addColumn(Bytes.toBytes(part[1]),
+ Bytes.toBytes(part[2]),
+ Bytes.toBytes(part[3]));
+ return put;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
new file mode 100644
index 0000000..f060fea
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.client.Get
+import org.apache.hadoop.hbase.client.Result
+import scala.reflect.ClassTag
+import org.apache.hadoop.hbase.client.Connection
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.spark.{Logging, SerializableWritable, SparkContext}
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.hbase.client.Mutation
+import org.apache.spark.streaming.dstream.DStream
+import java.io._
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat
+import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+/**
+ * HBaseContext is a façade for HBase operations
+ * like bulk put, get, increment, delete, and scan
+ *
+ * HBaseContext will take the responsibilities
+ * of disseminating the configuration information
+ * to the working and managing the life cycle of HConnections.
+ */
+class HBaseContext(@transient sc: SparkContext,
+ @transient config: Configuration,
+ val tmpHdfsConfgFile: String = null)
+ extends Serializable with Logging {
+
+ @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+ @transient var tmpHdfsConfiguration:Configuration = config
+ @transient var appliedCredentials = false
+ @transient val job = Job.getInstance(config)
+ TableMapReduceUtil.initCredentials(job)
+ val broadcastedConf = sc.broadcast(new SerializableWritable(config))
+ val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
+
+ if (tmpHdfsConfgFile != null && config != null) {
+ val fs = FileSystem.newInstance(config)
+ val tmpPath = new Path(tmpHdfsConfgFile)
+ if (!fs.exists(tmpPath)) {
+ val outputStream = fs.create(tmpPath)
+ config.write(outputStream)
+ outputStream.close()
+ } else {
+ logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
+ }
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark RDD foreachPartition.
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * @param rdd Original RDD with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the RDD values and a HConnection object to interact
+ * with HBase
+ */
+ def foreachPartition[T](rdd: RDD[T],
+ f: (Iterator[T], Connection) => Unit):Unit = {
+ rdd.foreachPartition(
+ it => hbaseForeachPartition(broadcastedConf, it, f))
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark Streaming dStream foreach
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * @param dstream Original DStream with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the DStream values and a HConnection object to
+ * interact with HBase
+ */
+ def foreachPartition[T](dstream: DStream[T],
+ f: (Iterator[T], Connection) => Unit):Unit = {
+ dstream.foreachRDD((rdd, time) => {
+ foreachPartition(rdd, f)
+ })
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark RDD mapPartition.
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * @param rdd Original RDD with data to iterate over
+ * @param mp Function to be given a iterator to iterate through
+ * the RDD values and a HConnection object to interact
+ * with HBase
+ * @return Returns a new RDD generated by the user definition
+ * function just like normal mapPartition
+ */
+ def mapPartitions[T, R: ClassTag](rdd: RDD[T],
+ mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
+
+ rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
+ it,
+ mp))
+
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark Streaming DStream
+ * foreachPartition.
+ *
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * Note: Make sure to partition correctly to avoid memory issue when
+ * getting data from HBase
+ *
+ * @param dstream Original DStream with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the DStream values and a HConnection object to
+ * interact with HBase
+ * @return Returns a new DStream generated by the user
+ * definition function just like normal mapPartition
+ */
+ def streamForeachPartition[T](dstream: DStream[T],
+ f: (Iterator[T], Connection) => Unit): Unit = {
+
+ dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
+ }
+
+ /**
+ * A simple enrichment of the traditional Spark Streaming DStream
+ * mapPartition.
+ *
+ * This function differs from the original in that it offers the
+ * developer access to a already connected HConnection object
+ *
+ * Note: Do not close the HConnection object. All HConnection
+ * management is handled outside this method
+ *
+ * Note: Make sure to partition correctly to avoid memory issue when
+ * getting data from HBase
+ *
+ * @param dstream Original DStream with data to iterate over
+ * @param f Function to be given a iterator to iterate through
+ * the DStream values and a HConnection object to
+ * interact with HBase
+ * @return Returns a new DStream generated by the user
+ * definition function just like normal mapPartition
+ */
+ def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
+ f: (Iterator[T], Connection) => Iterator[U]):
+ DStream[U] = {
+ dstream.mapPartitions(it => hbaseMapPartition[T, U](
+ broadcastedConf,
+ it,
+ f))
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.foreachPartition method.
+ *
+ * It allow addition support for a user to take RDD
+ * and generate puts and send them to HBase.
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param rdd Original RDD with data to iterate over
+ * @param tableName The name of the table to put into
+ * @param f Function to convert a value in the RDD to a HBase Put
+ */
+ def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
+
+ val tName = tableName.getName
+ rdd.foreachPartition(
+ it => hbaseForeachPartition[T](
+ broadcastedConf,
+ it,
+ (iterator, connection) => {
+ val m = connection.getBufferedMutator(TableName.valueOf(tName))
+ iterator.foreach(T => m.mutate(f(T)))
+ m.flush()
+ m.close()
+ }))
+ }
+
+ def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){
+ credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+
+ logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
+
+ if (!appliedCredentials && credentials != null) {
+ appliedCredentials = true
+
+ @transient val ugi = UserGroupInformation.getCurrentUser
+ ugi.addCredentials(credentials)
+ // specify that this is a proxy user
+ ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
+
+ ugi.addCredentials(credentialsConf.value.value)
+ }
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.streamMapPartition method.
+ *
+ * It allow addition support for a user to take a DStream and
+ * generate puts and send them to HBase.
+ *
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param dstream Original DStream with data to iterate over
+ * @param tableName The name of the table to put into
+ * @param f Function to convert a value in
+ * the DStream to a HBase Put
+ */
+ def streamBulkPut[T](dstream: DStream[T],
+ tableName: TableName,
+ f: (T) => Put) = {
+ val tName = tableName.getName
+ dstream.foreachRDD((rdd, time) => {
+ bulkPut(rdd, TableName.valueOf(tName), f)
+ })
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.foreachPartition method.
+ *
+ * It allow addition support for a user to take a RDD and generate delete
+ * and send them to HBase. The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param rdd Original RDD with data to iterate over
+ * @param tableName The name of the table to delete from
+ * @param f Function to convert a value in the RDD to a
+ * HBase Deletes
+ * @param batchSize The number of delete to batch before sending to HBase
+ */
+ def bulkDelete[T](rdd: RDD[T], tableName: TableName,
+ f: (T) => Delete, batchSize: Integer) {
+ bulkMutation(rdd, tableName, f, batchSize)
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.streamBulkMutation method.
+ *
+ * It allow addition support for a user to take a DStream and
+ * generate Delete and send them to HBase.
+ *
+ * The complexity of managing the HConnection is
+ * removed from the developer
+ *
+ * @param dstream Original DStream with data to iterate over
+ * @param tableName The name of the table to delete from
+ * @param f function to convert a value in the DStream to a
+ * HBase Delete
+ * @param batchSize The number of deletes to batch before sending to HBase
+ */
+ def streamBulkDelete[T](dstream: DStream[T],
+ tableName: TableName,
+ f: (T) => Delete,
+ batchSize: Integer) = {
+ streamBulkMutation(dstream, tableName, f, batchSize)
+ }
+
+ /**
+ * Under lining function to support all bulk mutations
+ *
+ * May be opened up if requested
+ */
+ private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
+ f: (T) => Mutation, batchSize: Integer) {
+
+ val tName = tableName.getName
+ rdd.foreachPartition(
+ it => hbaseForeachPartition[T](
+ broadcastedConf,
+ it,
+ (iterator, connection) => {
+ val table = connection.getTable(TableName.valueOf(tName))
+ val mutationList = new java.util.ArrayList[Mutation]
+ iterator.foreach(T => {
+ mutationList.add(f(T))
+ if (mutationList.size >= batchSize) {
+ table.batch(mutationList, null)
+ mutationList.clear()
+ }
+ })
+ if (mutationList.size() > 0) {
+ table.batch(mutationList, null)
+ mutationList.clear()
+ }
+ table.close()
+ }))
+ }
+
+ /**
+ * Under lining function to support all bulk streaming mutations
+ *
+ * May be opened up if requested
+ */
+ private def streamBulkMutation[T](dstream: DStream[T],
+ tableName: TableName,
+ f: (T) => Mutation,
+ batchSize: Integer) = {
+ val tName = tableName.getName
+ dstream.foreachRDD((rdd, time) => {
+ bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
+ })
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.mapPartition method.
+ *
+ * It allow addition support for a user to take a RDD and generates a
+ * new RDD based on Gets and the results they bring back from HBase
+ *
+ * @param rdd Original RDD with data to iterate over
+ * @param tableName The name of the table to get from
+ * @param makeGet function to convert a value in the RDD to a
+ * HBase Get
+ * @param convertResult This will convert the HBase Result object to
+ * what ever the user wants to put in the resulting
+ * RDD
+ * return new RDD that is created by the Get to HBase
+ */
+ def bulkGet[T, U: ClassTag](tableName: TableName,
+ batchSize: Integer,
+ rdd: RDD[T],
+ makeGet: (T) => Get,
+ convertResult: (Result) => U): RDD[U] = {
+
+ val getMapPartition = new GetMapPartition(tableName,
+ batchSize,
+ makeGet,
+ convertResult)
+
+ rdd.mapPartitions[U](it =>
+ hbaseMapPartition[T, U](
+ broadcastedConf,
+ it,
+ getMapPartition.run))
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.streamMap method.
+ *
+ * It allow addition support for a user to take a DStream and
+ * generates a new DStream based on Gets and the results
+ * they bring back from HBase
+ *
+ * @param tableName The name of the table to get from
+ * @param batchSize The number of Gets to be sent in a single batch
+ * @param dStream Original DStream with data to iterate over
+ * @param makeGet Function to convert a value in the DStream to a
+ * HBase Get
+ * @param convertResult This will convert the HBase Result object to
+ * what ever the user wants to put in the resulting
+ * DStream
+ * @return A new DStream that is created by the Get to HBase
+ */
+ def streamBulkGet[T, U: ClassTag](tableName: TableName,
+ batchSize: Integer,
+ dStream: DStream[T],
+ makeGet: (T) => Get,
+ convertResult: (Result) => U): DStream[U] = {
+
+ val getMapPartition = new GetMapPartition(tableName,
+ batchSize,
+ makeGet,
+ convertResult)
+
+ dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
+ broadcastedConf,
+ it,
+ getMapPartition.run))
+ }
+
+ /**
+ * This function will use the native HBase TableInputFormat with the
+ * given scan object to generate a new RDD
+ *
+ * @param tableName the name of the table to scan
+ * @param scan the HBase scan object to use to read data from HBase
+ * @param f function to convert a Result object from HBase into
+ * what the user wants in the final generated RDD
+ * @return new RDD with results from scan
+ */
+ def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
+ f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
+
+ val job: Job = Job.getInstance(getConf(broadcastedConf))
+
+ TableMapReduceUtil.initCredentials(job)
+ TableMapReduceUtil.initTableMapperJob(tableName, scan,
+ classOf[IdentityTableMapper], null, null, job)
+
+ sc.newAPIHadoopRDD(job.getConfiguration,
+ classOf[TableInputFormat],
+ classOf[ImmutableBytesWritable],
+ classOf[Result]).map(f)
+ }
+
+ /**
+ * A overloaded version of HBaseContext hbaseRDD that defines the
+ * type of the resulting RDD
+ *
+ * @param tableName the name of the table to scan
+ * @param scans the HBase scan object to use to read data from HBase
+ * @return New RDD with results from scan
+ *
+ */
+ def hbaseRDD(tableName: TableName, scans: Scan):
+ RDD[(ImmutableBytesWritable, Result)] = {
+
+ hbaseRDD[(ImmutableBytesWritable, Result)](
+ tableName,
+ scans,
+ (r: (ImmutableBytesWritable, Result)) => r)
+ }
+
+ /**
+ * underlining wrapper all foreach functions in HBaseContext
+ */
+ private def hbaseForeachPartition[T](configBroadcast:
+ Broadcast[SerializableWritable[Configuration]],
+ it: Iterator[T],
+ f: (Iterator[T], Connection) => Unit) = {
+
+ val config = getConf(configBroadcast)
+
+ applyCreds(configBroadcast)
+ // specify that this is a proxy user
+ val connection = ConnectionFactory.createConnection(config)
+ f(it, connection)
+ connection.close()
+ }
+
+ private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
+ Configuration = {
+
+ if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
+ val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
+ val inputStream = fs.open(new Path(tmpHdfsConfgFile))
+ tmpHdfsConfiguration = new Configuration(false)
+ tmpHdfsConfiguration.readFields(inputStream)
+ inputStream.close()
+ }
+
+ if (tmpHdfsConfiguration == null) {
+ try {
+ tmpHdfsConfiguration = configBroadcast.value.value
+ } catch {
+ case ex: Exception => logError("Unable to getConfig from broadcast", ex)
+ }
+ }
+ tmpHdfsConfiguration
+ }
+
+ /**
+ * underlining wrapper all mapPartition functions in HBaseContext
+ *
+ */
+ private def hbaseMapPartition[K, U](
+ configBroadcast:
+ Broadcast[SerializableWritable[Configuration]],
+ it: Iterator[K],
+ mp: (Iterator[K], Connection) =>
+ Iterator[U]): Iterator[U] = {
+
+ val config = getConf(configBroadcast)
+ applyCreds(configBroadcast)
+
+ val connection = ConnectionFactory.createConnection(config)
+ val res = mp(it, connection)
+ connection.close()
+ res
+
+ }
+
+ /**
+ * underlining wrapper all get mapPartition functions in HBaseContext
+ */
+ private class GetMapPartition[T, U](tableName: TableName,
+ batchSize: Integer,
+ makeGet: (T) => Get,
+ convertResult: (Result) => U)
+ extends Serializable {
+
+ val tName = tableName.getName
+
+ def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
+ val table = connection.getTable(TableName.valueOf(tName))
+
+ val gets = new java.util.ArrayList[Get]()
+ var res = List[U]()
+
+ while (iterator.hasNext) {
+ gets.add(makeGet(iterator.next()))
+
+ if (gets.size() == batchSize) {
+ val results = table.get(gets)
+ res = res ++ results.map(convertResult)
+ gets.clear()
+ }
+ }
+ if (gets.size() > 0) {
+ val results = table.get(gets)
+ res = res ++ results.map(convertResult)
+ gets.clear()
+ }
+ table.close()
+ res.iterator
+ }
+ }
+
+ /**
+ * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+ *
+ * This method is used to keep ClassTags out of the external Java API, as
+ * the Java compiler cannot produce them automatically. While this
+ * ClassTag-faking does please the compiler, it can cause problems at runtime
+ * if the Scala API relies on ClassTags for correctness.
+ *
+ * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+ * just worse performance or security issues.
+ * For instance, an Array of AnyRef can hold any type T, but may lose primitive
+ * specialization.
+ */
+ private[spark]
+ def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
new file mode 100644
index 0000000..d563a29
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseDStreamFunctions contains a set of implicit functions that can be
+ * applied to a Spark DStream so that we can easily interact with HBase
+ */
+object HBaseDStreamFunctions {
+
+ /**
+ * These are implicit methods for a DStream that contains any type of
+ * data.
+ *
+ * @param dStream This is for dStreams of any type
+ * @tparam T Type T
+ */
+ implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * put. This will not return a new Stream. Think of it like a foreach
+ *
+ * @param hc The hbaseContext object to identify which
+ * HBase cluster connection to use
+ * @param tableName The tableName that the put will be sent to
+ * @param f The function that will turn the DStream values
+ * into HBase Put objects.
+ */
+ def hbaseBulkPut(hc: HBaseContext,
+ tableName: TableName,
+ f: (T) => Put): Unit = {
+ hc.streamBulkPut(dStream, tableName, f)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * get. This will return a new DStream. Think about it as a DStream map
+ * function. In that every DStream value will get a new value out of
+ * HBase. That new value will populate the newly generated DStream.
+ *
+ * @param hc The hbaseContext object to identify which
+ * HBase cluster connection to use
+ * @param tableName The tableName that the put will be sent to
+ * @param batchSize How many gets to execute in a single batch
+ * @param f The function that will turn the RDD values
+ * in HBase Get objects
+ * @param convertResult The function that will convert a HBase
+ * Result object into a value that will go
+ * into the resulting DStream
+ * @tparam R The type of Object that will be coming
+ * out of the resulting DStream
+ * @return A resulting DStream with type R objects
+ */
+ def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+ tableName: TableName,
+ batchSize:Int, f: (T) => Get, convertResult: (Result) => R):
+ DStream[R] = {
+ hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * get. This will return a new DStream. Think about it as a DStream map
+ * function. In that every DStream value will get a new value out of
+ * HBase. That new value will populate the newly generated DStream.
+ *
+ * @param hc The hbaseContext object to identify which
+ * HBase cluster connection to use
+ * @param tableName The tableName that the put will be sent to
+ * @param batchSize How many gets to execute in a single batch
+ * @param f The function that will turn the RDD values
+ * in HBase Get objects
+ * @return A resulting DStream with type R objects
+ */
+ def hbaseBulkGet(hc: HBaseContext,
+ tableName: TableName, batchSize:Int,
+ f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = {
+ hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
+ tableName, batchSize, dStream, f,
+ result => (new ImmutableBytesWritable(result.getRow), result))
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * Delete. This will not return a new DStream.
+ *
+ * @param hc The hbaseContext object to identify which HBase
+ * cluster connection to use
+ * @param tableName The tableName that the deletes will be sent to
+ * @param f The function that will convert the DStream value into
+ * a HBase Delete Object
+ * @param batchSize The number of Deletes to be sent in a single batch
+ */
+ def hbaseBulkDelete(hc: HBaseContext,
+ tableName: TableName,
+ f:(T) => Delete, batchSize:Int): Unit = {
+ hc.streamBulkDelete(dStream, tableName, f, batchSize)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's
+ * foreachPartition method. This will ack very much like a normal DStream
+ * foreach method but for the fact that you will now have a HBase connection
+ * while iterating through the values.
+ *
+ * @param hc The hbaseContext object to identify which HBase
+ * cluster connection to use
+ * @param f This function will get an iterator for a Partition of an
+ * DStream along with a connection object to HBase
+ */
+ def hbaseForeachPartition(hc: HBaseContext,
+ f: (Iterator[T], Connection) => Unit): Unit = {
+ hc.streamForeachPartition(dStream, f)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's
+ * mapPartitions method. This will ask very much like a normal DStream
+ * map partitions method but for the fact that you will now have a
+ * HBase connection while iterating through the values
+ *
+ * @param hc The hbaseContext object to identify which HBase
+ * cluster connection to use
+ * @param f This function will get an iterator for a Partition of an
+ * DStream along with a connection object to HBase
+ * @tparam R This is the type of objects that will go into the resulting
+ * DStream
+ * @return A resulting DStream of type R
+ */
+ def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+ f: (Iterator[T], Connection) => Iterator[R]):
+ DStream[R] = {
+ hc.streamMapPartitions(dStream, f)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30f7d127/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
new file mode 100644
index 0000000..fb8456d
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseRDDFunctions contains a set of implicit functions that can be
+ * applied to a Spark RDD so that we can easily interact with HBase
+ */
+object HBaseRDDFunctions
+{
+
+ /**
+ * These are implicit methods for a RDD that contains any type of
+ * data.
+ *
+ * @param rdd This is for rdd of any type
+ * @tparam T This is any type
+ */
+ implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * put. This will not return a new RDD. Think of it like a foreach
+ *
+ * @param hc The hbaseContext object to identify which
+ * HBase cluster connection to use
+ * @param tableName The tableName that the put will be sent to
+ * @param f The function that will turn the RDD values
+ * into HBase Put objects.
+ */
+ def hbaseBulkPut(hc: HBaseContext,
+ tableName: TableName,
+ f: (T) => Put): Unit = {
+ hc.bulkPut(rdd, tableName, f)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * get. This will return a new RDD. Think about it as a RDD map
+ * function. In that every RDD value will get a new value out of
+ * HBase. That new value will populate the newly generated RDD.
+ *
+ * @param hc The hbaseContext object to identify which
+ * HBase cluster connection to use
+ * @param tableName The tableName that the put will be sent to
+ * @param batchSize How many gets to execute in a single batch
+ * @param f The function that will turn the RDD values
+ * in HBase Get objects
+ * @param convertResult The function that will convert a HBase
+ * Result object into a value that will go
+ * into the resulting RDD
+ * @tparam R The type of Object that will be coming
+ * out of the resulting RDD
+ * @return A resulting RDD with type R objects
+ */
+ def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+ tableName: TableName, batchSize:Int,
+ f: (T) => Get, convertResult: (Result) => R): RDD[R] = {
+ hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * get. This will return a new RDD. Think about it as a RDD map
+ * function. In that every RDD value will get a new value out of
+ * HBase. That new value will populate the newly generated RDD.
+ *
+ * @param hc The hbaseContext object to identify which
+ * HBase cluster connection to use
+ * @param tableName The tableName that the put will be sent to
+ * @param batchSize How many gets to execute in a single batch
+ * @param f The function that will turn the RDD values
+ * in HBase Get objects
+ * @return A resulting RDD with type R objects
+ */
+ def hbaseBulkGet(hc: HBaseContext,
+ tableName: TableName, batchSize:Int,
+ f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = {
+ hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
+ batchSize, rdd, f,
+ result => if (result != null && result.getRow != null) {
+ (new ImmutableBytesWritable(result.getRow), result)
+ } else {
+ null
+ })
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's bulk
+ * Delete. This will not return a new RDD.
+ *
+ * @param hc The hbaseContext object to identify which HBase
+ * cluster connection to use
+ * @param tableName The tableName that the deletes will be sent to
+ * @param f The function that will convert the RDD value into
+ * a HBase Delete Object
+ * @param batchSize The number of Deletes to be sent in a single batch
+ */
+ def hbaseBulkDelete(hc: HBaseContext,
+ tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = {
+ hc.bulkDelete(rdd, tableName, f, batchSize)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's
+ * foreachPartition method. This will ack very much like a normal RDD
+ * foreach method but for the fact that you will now have a HBase connection
+ * while iterating through the values.
+ *
+ * @param hc The hbaseContext object to identify which HBase
+ * cluster connection to use
+ * @param f This function will get an iterator for a Partition of an
+ * RDD along with a connection object to HBase
+ */
+ def hbaseForeachPartition(hc: HBaseContext,
+ f: (Iterator[T], Connection) => Unit): Unit = {
+ hc.foreachPartition(rdd, f)
+ }
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's
+ * mapPartitions method. This will ask very much like a normal RDD
+ * map partitions method but for the fact that you will now have a
+ * HBase connection while iterating through the values
+ *
+ * @param hc The hbaseContext object to identify which HBase
+ * cluster connection to use
+ * @param f This function will get an iterator for a Partition of an
+ * RDD along with a connection object to HBase
+ * @tparam R This is the type of objects that will go into the resulting
+ * RDD
+ * @return A resulting RDD of type R
+ */
+ def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+ f: (Iterator[T], Connection) => Iterator[R]):
+ RDD[R] = {
+ hc.mapPartitions[T,R](rdd, f)
+ }
+ }
+}