You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/09/11 07:35:42 UTC

hbase git commit: HBASE-14377 JavaHBaseContextSuite isn't being run.

Repository: hbase
Updated Branches:
  refs/heads/master ae3176b9c -> fda317ceb


HBASE-14377 JavaHBaseContextSuite isn't being run.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fda317ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fda317ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fda317ce

Branch: refs/heads/master
Commit: fda317cebb5d306cabf1899e05cedb0225b2b62b
Parents: ae3176b
Author: Sean Busbey <bu...@cloudera.com>
Authored: Mon Sep 7 09:51:47 2015 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Sep 11 00:34:55 2015 -0500

----------------------------------------------------------------------
 hbase-spark/pom.xml                             |  10 +-
 .../hbase/spark/JavaHBaseContextSuite.java      | 334 ------------------
 .../hbase/spark/TestJavaHBaseContext.java       | 338 +++++++++++++++++++
 3 files changed, 347 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fda317ce/hbase-spark/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml
index 8110629..0af80ba 100644
--- a/hbase-spark/pom.xml
+++ b/hbase-spark/pom.xml
@@ -40,6 +40,7 @@
         <spark.version>1.3.0</spark.version>
         <scala.version>2.10.4</scala.version>
         <scala.binary.version>2.10</scala.binary.version>
+        <surefire.skipSecondPart>true</surefire.skipSecondPart>
         <top.dir>${project.basedir}/..</top.dir>
     </properties>
 
@@ -331,6 +332,14 @@
 
         <dependency>
             <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-annotations</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-hadoop-compat</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
@@ -507,7 +516,6 @@
 
 
     <build>
-        <testSourceDirectory>src/test/scala</testSourceDirectory>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/hbase/blob/fda317ce/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
deleted file mode 100644
index f19ad10..0000000
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
-import org.junit.*;
-
-import scala.Tuple2;
-
-import com.google.common.io.Files;
-
-public class JavaHBaseContextSuite implements Serializable {
-  private transient JavaSparkContext jsc;
-  HBaseTestingUtility htu;
-  protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class);
-
-
-  byte[] tableName = Bytes.toBytes("t1");
-  byte[] columnFamily = Bytes.toBytes("c");
-  String columnFamilyStr = Bytes.toString(columnFamily);
-
-  @Before
-  public void setUp() {
-    jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
-    jsc.addJar("spark.jar");
-
-    File tempDir = Files.createTempDir();
-    tempDir.deleteOnExit();
-
-    htu = HBaseTestingUtility.createLocalHTU();
-    try {
-      LOG.info("cleaning up test dir");
-
-      htu.cleanupTestDir();
-
-      LOG.info("starting minicluster");
-
-      htu.startMiniZKCluster();
-      htu.startMiniHBaseCluster(1, 1);
-
-      LOG.info(" - minicluster started");
-
-      try {
-        htu.deleteTable(TableName.valueOf(tableName));
-      } catch (Exception e) {
-        LOG.info(" - no table " + Bytes.toString(tableName) + " found");
-      }
-
-      LOG.info(" - creating table " + Bytes.toString(tableName));
-      htu.createTable(TableName.valueOf(tableName),
-              columnFamily);
-      LOG.info(" - created table");
-    } catch (Exception e1) {
-      throw new RuntimeException(e1);
-    }
-  }
-
-  @After
-  public void tearDown() {
-    try {
-      htu.deleteTable(TableName.valueOf(tableName));
-      LOG.info("shuting down minicluster");
-      htu.shutdownMiniHBaseCluster();
-      htu.shutdownMiniZKCluster();
-      LOG.info(" - minicluster shut down");
-      htu.cleanupTestDir();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    jsc.stop();
-    jsc = null;
-  }
-
-  @Test
-  public void testBulkPut() throws IOException {
-
-    List<String> list = new ArrayList<>();
-    list.add("1," + columnFamilyStr + ",a,1");
-    list.add("2," + columnFamilyStr + ",a,2");
-    list.add("3," + columnFamilyStr + ",a,3");
-    list.add("4," + columnFamilyStr + ",a,4");
-    list.add("5," + columnFamilyStr + ",a,5");
-
-    JavaRDD<String> rdd = jsc.parallelize(list);
-
-    Configuration conf = htu.getConfiguration();
-
-    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-    Connection conn = ConnectionFactory.createConnection(conf);
-    Table table = conn.getTable(TableName.valueOf(tableName));
-
-    try {
-      List<Delete> deletes = new ArrayList<>();
-      for (int i = 1; i < 6; i++) {
-        deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
-      }
-      table.delete(deletes);
-    } finally {
-      table.close();
-    }
-
-    hbaseContext.bulkPut(rdd,
-            TableName.valueOf(tableName),
-            new PutFunction());
-
-    table = conn.getTable(TableName.valueOf(tableName));
-
-    try {
-      Result result1 = table.get(new Get(Bytes.toBytes("1")));
-      Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
-
-      Result result2 = table.get(new Get(Bytes.toBytes("2")));
-      Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
-
-      Result result3 = table.get(new Get(Bytes.toBytes("3")));
-      Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
-
-      Result result4 = table.get(new Get(Bytes.toBytes("4")));
-      Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
-
-      Result result5 = table.get(new Get(Bytes.toBytes("5")));
-      Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
-    } finally {
-      table.close();
-      conn.close();
-    }
-  }
-
-  public static class PutFunction implements Function<String, Put> {
-
-    private static final long serialVersionUID = 1L;
-
-    public Put call(String v) throws Exception {
-      String[] cells = v.split(",");
-      Put put = new Put(Bytes.toBytes(cells[0]));
-
-      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
-              Bytes.toBytes(cells[3]));
-      return put;
-    }
-  }
-
-  @Test
-  public void testBulkDelete() throws IOException {
-    List<byte[]> list = new ArrayList<>();
-    list.add(Bytes.toBytes("1"));
-    list.add(Bytes.toBytes("2"));
-    list.add(Bytes.toBytes("3"));
-
-    JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
-    Configuration conf = htu.getConfiguration();
-
-    populateTableWithMockData(conf, TableName.valueOf(tableName));
-
-    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-    hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
-            new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
-
-
-
-    try (
-            Connection conn = ConnectionFactory.createConnection(conf);
-            Table table = conn.getTable(TableName.valueOf(tableName))
-    ){
-      Result result1 = table.get(new Get(Bytes.toBytes("1")));
-      Assert.assertNull("Row 1 should had been deleted", result1.getRow());
-
-      Result result2 = table.get(new Get(Bytes.toBytes("2")));
-      Assert.assertNull("Row 2 should had been deleted", result2.getRow());
-
-      Result result3 = table.get(new Get(Bytes.toBytes("3")));
-      Assert.assertNull("Row 3 should had been deleted", result3.getRow());
-
-      Result result4 = table.get(new Get(Bytes.toBytes("4")));
-      Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
-
-      Result result5 = table.get(new Get(Bytes.toBytes("5")));
-      Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
-    }
-  }
-
-  @Test
-  public void testDistributedScan() throws IOException {
-    Configuration conf = htu.getConfiguration();
-
-    populateTableWithMockData(conf, TableName.valueOf(tableName));
-
-    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-    Scan scan = new Scan();
-    scan.setCaching(100);
-
-    JavaRDD<String> javaRdd =
-            hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
-                    .map(new ScanConvertFunction());
-
-    List<String> results = javaRdd.collect();
-
-    Assert.assertEquals(results.size(), 5);
-  }
-
-  private static class ScanConvertFunction implements
-          Function<Tuple2<ImmutableBytesWritable, Result>, String> {
-    @Override
-    public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
-      return Bytes.toString(v1._1().copyBytes());
-    }
-  }
-
-  @Test
-  public void testBulkGet() throws IOException {
-    List<byte[]> list = new ArrayList<>();
-    list.add(Bytes.toBytes("1"));
-    list.add(Bytes.toBytes("2"));
-    list.add(Bytes.toBytes("3"));
-    list.add(Bytes.toBytes("4"));
-    list.add(Bytes.toBytes("5"));
-
-    JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
-    Configuration conf = htu.getConfiguration();
-
-    populateTableWithMockData(conf, TableName.valueOf(tableName));
-
-    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-    final JavaRDD<String> stringJavaRDD =
-            hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
-            new GetFunction(),
-            new ResultFunction());
-
-    Assert.assertEquals(stringJavaRDD.count(), 5);
-  }
-
-  public static class GetFunction implements Function<byte[], Get> {
-
-    private static final long serialVersionUID = 1L;
-
-    public Get call(byte[] v) throws Exception {
-      return new Get(v);
-    }
-  }
-
-  public static class ResultFunction implements Function<Result, String> {
-
-    private static final long serialVersionUID = 1L;
-
-    public String call(Result result) throws Exception {
-      Iterator<Cell> it = result.listCells().iterator();
-      StringBuilder b = new StringBuilder();
-
-      b.append(Bytes.toString(result.getRow())).append(":");
-
-      while (it.hasNext()) {
-        Cell cell = it.next();
-        String q = Bytes.toString(CellUtil.cloneQualifier(cell));
-        if ("counter".equals(q)) {
-          b.append("(")
-                  .append(q)
-                  .append(",")
-                  .append(Bytes.toLong(CellUtil.cloneValue(cell)))
-                  .append(")");
-        } else {
-          b.append("(")
-                  .append(q)
-                  .append(",")
-                  .append(Bytes.toString(CellUtil.cloneValue(cell)))
-                  .append(")");
-        }
-      }
-      return b.toString();
-    }
-  }
-
-  private void populateTableWithMockData(Configuration conf, TableName tableName)
-          throws IOException {
-    try (
-      Connection conn = ConnectionFactory.createConnection(conf);
-      Table table = conn.getTable(tableName)) {
-
-      List<Put> puts = new ArrayList<>();
-
-      for (int i = 1; i < 6; i++) {
-        Put put = new Put(Bytes.toBytes(Integer.toString(i)));
-        put.addColumn(columnFamily, columnFamily, columnFamily);
-        puts.add(put);
-      }
-      table.put(puts);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/fda317ce/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
new file mode 100644
index 0000000..724ac36
--- /dev/null
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+
+import scala.Tuple2;
+
+import com.google.common.io.Files;
+
+@Category({MiscTests.class, MediumTests.class})
+public class TestJavaHBaseContext implements Serializable {
+  private transient JavaSparkContext jsc;
+  HBaseTestingUtility htu;
+  protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
+
+
+  byte[] tableName = Bytes.toBytes("t1");
+  byte[] columnFamily = Bytes.toBytes("c");
+  String columnFamilyStr = Bytes.toString(columnFamily);
+
+  @Before
+  public void setUp() {
+    jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
+    jsc.addJar("spark.jar");
+
+    File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+
+    htu = HBaseTestingUtility.createLocalHTU();
+    try {
+      LOG.info("cleaning up test dir");
+
+      htu.cleanupTestDir();
+
+      LOG.info("starting minicluster");
+
+      htu.startMiniZKCluster();
+      htu.startMiniHBaseCluster(1, 1);
+
+      LOG.info(" - minicluster started");
+
+      try {
+        htu.deleteTable(TableName.valueOf(tableName));
+      } catch (Exception e) {
+        LOG.info(" - no table " + Bytes.toString(tableName) + " found");
+      }
+
+      LOG.info(" - creating table " + Bytes.toString(tableName));
+      htu.createTable(TableName.valueOf(tableName),
+              columnFamily);
+      LOG.info(" - created table");
+    } catch (Exception e1) {
+      throw new RuntimeException(e1);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      htu.deleteTable(TableName.valueOf(tableName));
+      LOG.info("shuting down minicluster");
+      htu.shutdownMiniHBaseCluster();
+      htu.shutdownMiniZKCluster();
+      LOG.info(" - minicluster shut down");
+      htu.cleanupTestDir();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    jsc.stop();
+    jsc = null;
+  }
+
+  @Test
+  public void testBulkPut() throws IOException {
+
+    List<String> list = new ArrayList<>();
+    list.add("1," + columnFamilyStr + ",a,1");
+    list.add("2," + columnFamilyStr + ",a,2");
+    list.add("3," + columnFamilyStr + ",a,3");
+    list.add("4," + columnFamilyStr + ",a,4");
+    list.add("5," + columnFamilyStr + ",a,5");
+
+    JavaRDD<String> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    Connection conn = ConnectionFactory.createConnection(conf);
+    Table table = conn.getTable(TableName.valueOf(tableName));
+
+    try {
+      List<Delete> deletes = new ArrayList<>();
+      for (int i = 1; i < 6; i++) {
+        deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
+      }
+      table.delete(deletes);
+    } finally {
+      table.close();
+    }
+
+    hbaseContext.bulkPut(rdd,
+            TableName.valueOf(tableName),
+            new PutFunction());
+
+    table = conn.getTable(TableName.valueOf(tableName));
+
+    try {
+      Result result1 = table.get(new Get(Bytes.toBytes("1")));
+      Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
+
+      Result result2 = table.get(new Get(Bytes.toBytes("2")));
+      Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
+
+      Result result3 = table.get(new Get(Bytes.toBytes("3")));
+      Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
+
+      Result result4 = table.get(new Get(Bytes.toBytes("4")));
+      Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+      Result result5 = table.get(new Get(Bytes.toBytes("5")));
+      Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+    } finally {
+      table.close();
+      conn.close();
+    }
+  }
+
+  public static class PutFunction implements Function<String, Put> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Put call(String v) throws Exception {
+      String[] cells = v.split(",");
+      Put put = new Put(Bytes.toBytes(cells[0]));
+
+      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
+              Bytes.toBytes(cells[3]));
+      return put;
+    }
+  }
+
+  @Test
+  public void testBulkDelete() throws IOException {
+    List<byte[]> list = new ArrayList<>();
+    list.add(Bytes.toBytes("1"));
+    list.add(Bytes.toBytes("2"));
+    list.add(Bytes.toBytes("3"));
+
+    JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+
+    populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
+            new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
+
+
+
+    try (
+            Connection conn = ConnectionFactory.createConnection(conf);
+            Table table = conn.getTable(TableName.valueOf(tableName))
+    ){
+      Result result1 = table.get(new Get(Bytes.toBytes("1")));
+      Assert.assertNull("Row 1 should had been deleted", result1.getRow());
+
+      Result result2 = table.get(new Get(Bytes.toBytes("2")));
+      Assert.assertNull("Row 2 should had been deleted", result2.getRow());
+
+      Result result3 = table.get(new Get(Bytes.toBytes("3")));
+      Assert.assertNull("Row 3 should had been deleted", result3.getRow());
+
+      Result result4 = table.get(new Get(Bytes.toBytes("4")));
+      Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+      Result result5 = table.get(new Get(Bytes.toBytes("5")));
+      Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+    }
+  }
+
+  @Test
+  public void testDistributedScan() throws IOException {
+    Configuration conf = htu.getConfiguration();
+
+    populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    Scan scan = new Scan();
+    scan.setCaching(100);
+
+    JavaRDD<String> javaRdd =
+            hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+                    .map(new ScanConvertFunction());
+
+    List<String> results = javaRdd.collect();
+
+    Assert.assertEquals(results.size(), 5);
+  }
+
+  private static class ScanConvertFunction implements
+          Function<Tuple2<ImmutableBytesWritable, Result>, String> {
+    @Override
+    public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
+      return Bytes.toString(v1._1().copyBytes());
+    }
+  }
+
+  @Test
+  public void testBulkGet() throws IOException {
+    List<byte[]> list = new ArrayList<>();
+    list.add(Bytes.toBytes("1"));
+    list.add(Bytes.toBytes("2"));
+    list.add(Bytes.toBytes("3"));
+    list.add(Bytes.toBytes("4"));
+    list.add(Bytes.toBytes("5"));
+
+    JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+
+    populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    final JavaRDD<String> stringJavaRDD =
+            hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
+            new GetFunction(),
+            new ResultFunction());
+
+    Assert.assertEquals(stringJavaRDD.count(), 5);
+  }
+
+  public static class GetFunction implements Function<byte[], Get> {
+
+    private static final long serialVersionUID = 1L;
+
+    public Get call(byte[] v) throws Exception {
+      return new Get(v);
+    }
+  }
+
+  public static class ResultFunction implements Function<Result, String> {
+
+    private static final long serialVersionUID = 1L;
+
+    public String call(Result result) throws Exception {
+      Iterator<Cell> it = result.listCells().iterator();
+      StringBuilder b = new StringBuilder();
+
+      b.append(Bytes.toString(result.getRow())).append(":");
+
+      while (it.hasNext()) {
+        Cell cell = it.next();
+        String q = Bytes.toString(CellUtil.cloneQualifier(cell));
+        if ("counter".equals(q)) {
+          b.append("(")
+                  .append(q)
+                  .append(",")
+                  .append(Bytes.toLong(CellUtil.cloneValue(cell)))
+                  .append(")");
+        } else {
+          b.append("(")
+                  .append(q)
+                  .append(",")
+                  .append(Bytes.toString(CellUtil.cloneValue(cell)))
+                  .append(")");
+        }
+      }
+      return b.toString();
+    }
+  }
+
+  private void populateTableWithMockData(Configuration conf, TableName tableName)
+          throws IOException {
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf);
+      Table table = conn.getTable(tableName)) {
+
+      List<Put> puts = new ArrayList<>();
+
+      for (int i = 1; i < 6; i++) {
+        Put put = new Put(Bytes.toBytes(Integer.toString(i)));
+        put.addColumn(columnFamily, columnFamily, columnFamily);
+        puts.add(put);
+      }
+      table.put(puts);
+    }
+  }
+
+}
\ No newline at end of file