You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2015/09/11 07:35:42 UTC
hbase git commit: HBASE-14377 JavaHBaseContextSuite isn't being run.
Repository: hbase
Updated Branches:
refs/heads/master ae3176b9c -> fda317ceb
HBASE-14377 JavaHBaseContextSuite isn't being run.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fda317ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fda317ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fda317ce
Branch: refs/heads/master
Commit: fda317cebb5d306cabf1899e05cedb0225b2b62b
Parents: ae3176b
Author: Sean Busbey <bu...@cloudera.com>
Authored: Mon Sep 7 09:51:47 2015 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Fri Sep 11 00:34:55 2015 -0500
----------------------------------------------------------------------
hbase-spark/pom.xml | 10 +-
.../hbase/spark/JavaHBaseContextSuite.java | 334 ------------------
.../hbase/spark/TestJavaHBaseContext.java | 338 +++++++++++++++++++
3 files changed, 347 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fda317ce/hbase-spark/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml
index 8110629..0af80ba 100644
--- a/hbase-spark/pom.xml
+++ b/hbase-spark/pom.xml
@@ -40,6 +40,7 @@
<spark.version>1.3.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
+ <surefire.skipSecondPart>true</surefire.skipSecondPart>
<top.dir>${project.basedir}/..</top.dir>
</properties>
@@ -331,6 +332,14 @@
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-annotations</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version>
<scope>test</scope>
@@ -507,7 +516,6 @@
<build>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/hbase/blob/fda317ce/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
deleted file mode 100644
index f19ad10..0000000
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import java.util.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
-import org.junit.*;
-
-import scala.Tuple2;
-
-import com.google.common.io.Files;
-
-public class JavaHBaseContextSuite implements Serializable {
- private transient JavaSparkContext jsc;
- HBaseTestingUtility htu;
- protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class);
-
-
- byte[] tableName = Bytes.toBytes("t1");
- byte[] columnFamily = Bytes.toBytes("c");
- String columnFamilyStr = Bytes.toString(columnFamily);
-
- @Before
- public void setUp() {
- jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
- jsc.addJar("spark.jar");
-
- File tempDir = Files.createTempDir();
- tempDir.deleteOnExit();
-
- htu = HBaseTestingUtility.createLocalHTU();
- try {
- LOG.info("cleaning up test dir");
-
- htu.cleanupTestDir();
-
- LOG.info("starting minicluster");
-
- htu.startMiniZKCluster();
- htu.startMiniHBaseCluster(1, 1);
-
- LOG.info(" - minicluster started");
-
- try {
- htu.deleteTable(TableName.valueOf(tableName));
- } catch (Exception e) {
- LOG.info(" - no table " + Bytes.toString(tableName) + " found");
- }
-
- LOG.info(" - creating table " + Bytes.toString(tableName));
- htu.createTable(TableName.valueOf(tableName),
- columnFamily);
- LOG.info(" - created table");
- } catch (Exception e1) {
- throw new RuntimeException(e1);
- }
- }
-
- @After
- public void tearDown() {
- try {
- htu.deleteTable(TableName.valueOf(tableName));
- LOG.info("shuting down minicluster");
- htu.shutdownMiniHBaseCluster();
- htu.shutdownMiniZKCluster();
- LOG.info(" - minicluster shut down");
- htu.cleanupTestDir();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- jsc.stop();
- jsc = null;
- }
-
- @Test
- public void testBulkPut() throws IOException {
-
- List<String> list = new ArrayList<>();
- list.add("1," + columnFamilyStr + ",a,1");
- list.add("2," + columnFamilyStr + ",a,2");
- list.add("3," + columnFamilyStr + ",a,3");
- list.add("4," + columnFamilyStr + ",a,4");
- list.add("5," + columnFamilyStr + ",a,5");
-
- JavaRDD<String> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(TableName.valueOf(tableName));
-
- try {
- List<Delete> deletes = new ArrayList<>();
- for (int i = 1; i < 6; i++) {
- deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
- }
- table.delete(deletes);
- } finally {
- table.close();
- }
-
- hbaseContext.bulkPut(rdd,
- TableName.valueOf(tableName),
- new PutFunction());
-
- table = conn.getTable(TableName.valueOf(tableName));
-
- try {
- Result result1 = table.get(new Get(Bytes.toBytes("1")));
- Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
-
- Result result2 = table.get(new Get(Bytes.toBytes("2")));
- Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
-
- Result result3 = table.get(new Get(Bytes.toBytes("3")));
- Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
-
- Result result4 = table.get(new Get(Bytes.toBytes("4")));
- Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
-
- Result result5 = table.get(new Get(Bytes.toBytes("5")));
- Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
- } finally {
- table.close();
- conn.close();
- }
- }
-
- public static class PutFunction implements Function<String, Put> {
-
- private static final long serialVersionUID = 1L;
-
- public Put call(String v) throws Exception {
- String[] cells = v.split(",");
- Put put = new Put(Bytes.toBytes(cells[0]));
-
- put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
- Bytes.toBytes(cells[3]));
- return put;
- }
- }
-
- @Test
- public void testBulkDelete() throws IOException {
- List<byte[]> list = new ArrayList<>();
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
-
- populateTableWithMockData(conf, TableName.valueOf(tableName));
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
- new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
-
-
-
- try (
- Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(TableName.valueOf(tableName))
- ){
- Result result1 = table.get(new Get(Bytes.toBytes("1")));
- Assert.assertNull("Row 1 should had been deleted", result1.getRow());
-
- Result result2 = table.get(new Get(Bytes.toBytes("2")));
- Assert.assertNull("Row 2 should had been deleted", result2.getRow());
-
- Result result3 = table.get(new Get(Bytes.toBytes("3")));
- Assert.assertNull("Row 3 should had been deleted", result3.getRow());
-
- Result result4 = table.get(new Get(Bytes.toBytes("4")));
- Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
-
- Result result5 = table.get(new Get(Bytes.toBytes("5")));
- Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
- }
- }
-
- @Test
- public void testDistributedScan() throws IOException {
- Configuration conf = htu.getConfiguration();
-
- populateTableWithMockData(conf, TableName.valueOf(tableName));
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- Scan scan = new Scan();
- scan.setCaching(100);
-
- JavaRDD<String> javaRdd =
- hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
- .map(new ScanConvertFunction());
-
- List<String> results = javaRdd.collect();
-
- Assert.assertEquals(results.size(), 5);
- }
-
- private static class ScanConvertFunction implements
- Function<Tuple2<ImmutableBytesWritable, Result>, String> {
- @Override
- public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
- return Bytes.toString(v1._1().copyBytes());
- }
- }
-
- @Test
- public void testBulkGet() throws IOException {
- List<byte[]> list = new ArrayList<>();
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
- list.add(Bytes.toBytes("4"));
- list.add(Bytes.toBytes("5"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
-
- populateTableWithMockData(conf, TableName.valueOf(tableName));
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- final JavaRDD<String> stringJavaRDD =
- hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
- new GetFunction(),
- new ResultFunction());
-
- Assert.assertEquals(stringJavaRDD.count(), 5);
- }
-
- public static class GetFunction implements Function<byte[], Get> {
-
- private static final long serialVersionUID = 1L;
-
- public Get call(byte[] v) throws Exception {
- return new Get(v);
- }
- }
-
- public static class ResultFunction implements Function<Result, String> {
-
- private static final long serialVersionUID = 1L;
-
- public String call(Result result) throws Exception {
- Iterator<Cell> it = result.listCells().iterator();
- StringBuilder b = new StringBuilder();
-
- b.append(Bytes.toString(result.getRow())).append(":");
-
- while (it.hasNext()) {
- Cell cell = it.next();
- String q = Bytes.toString(CellUtil.cloneQualifier(cell));
- if ("counter".equals(q)) {
- b.append("(")
- .append(q)
- .append(",")
- .append(Bytes.toLong(CellUtil.cloneValue(cell)))
- .append(")");
- } else {
- b.append("(")
- .append(q)
- .append(",")
- .append(Bytes.toString(CellUtil.cloneValue(cell)))
- .append(")");
- }
- }
- return b.toString();
- }
- }
-
- private void populateTableWithMockData(Configuration conf, TableName tableName)
- throws IOException {
- try (
- Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName)) {
-
- List<Put> puts = new ArrayList<>();
-
- for (int i = 1; i < 6; i++) {
- Put put = new Put(Bytes.toBytes(Integer.toString(i)));
- put.addColumn(columnFamily, columnFamily, columnFamily);
- puts.add(put);
- }
- table.put(puts);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/fda317ce/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
new file mode 100644
index 0000000..724ac36
--- /dev/null
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+
+import scala.Tuple2;
+
+import com.google.common.io.Files;
+
+@Category({MiscTests.class, MediumTests.class})
+public class TestJavaHBaseContext implements Serializable {
+ private transient JavaSparkContext jsc;
+ HBaseTestingUtility htu;
+ protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
+
+
+ byte[] tableName = Bytes.toBytes("t1");
+ byte[] columnFamily = Bytes.toBytes("c");
+ String columnFamilyStr = Bytes.toString(columnFamily);
+
+ @Before
+ public void setUp() {
+ jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
+ jsc.addJar("spark.jar");
+
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+
+ htu = HBaseTestingUtility.createLocalHTU();
+ try {
+ LOG.info("cleaning up test dir");
+
+ htu.cleanupTestDir();
+
+ LOG.info("starting minicluster");
+
+ htu.startMiniZKCluster();
+ htu.startMiniHBaseCluster(1, 1);
+
+ LOG.info(" - minicluster started");
+
+ try {
+ htu.deleteTable(TableName.valueOf(tableName));
+ } catch (Exception e) {
+ LOG.info(" - no table " + Bytes.toString(tableName) + " found");
+ }
+
+ LOG.info(" - creating table " + Bytes.toString(tableName));
+ htu.createTable(TableName.valueOf(tableName),
+ columnFamily);
+ LOG.info(" - created table");
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ htu.deleteTable(TableName.valueOf(tableName));
+ LOG.info("shuting down minicluster");
+ htu.shutdownMiniHBaseCluster();
+ htu.shutdownMiniZKCluster();
+ LOG.info(" - minicluster shut down");
+ htu.cleanupTestDir();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ jsc.stop();
+ jsc = null;
+ }
+
+ @Test
+ public void testBulkPut() throws IOException {
+
+ List<String> list = new ArrayList<>();
+ list.add("1," + columnFamilyStr + ",a,1");
+ list.add("2," + columnFamilyStr + ",a,2");
+ list.add("3," + columnFamilyStr + ",a,3");
+ list.add("4," + columnFamilyStr + ",a,4");
+ list.add("5," + columnFamilyStr + ",a,5");
+
+ JavaRDD<String> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(TableName.valueOf(tableName));
+
+ try {
+ List<Delete> deletes = new ArrayList<>();
+ for (int i = 1; i < 6; i++) {
+ deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
+ }
+ table.delete(deletes);
+ } finally {
+ table.close();
+ }
+
+ hbaseContext.bulkPut(rdd,
+ TableName.valueOf(tableName),
+ new PutFunction());
+
+ table = conn.getTable(TableName.valueOf(tableName));
+
+ try {
+ Result result1 = table.get(new Get(Bytes.toBytes("1")));
+ Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
+
+ Result result2 = table.get(new Get(Bytes.toBytes("2")));
+ Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
+
+ Result result3 = table.get(new Get(Bytes.toBytes("3")));
+ Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
+
+ Result result4 = table.get(new Get(Bytes.toBytes("4")));
+ Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+ Result result5 = table.get(new Get(Bytes.toBytes("5")));
+ Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+ } finally {
+ table.close();
+ conn.close();
+ }
+ }
+
+ public static class PutFunction implements Function<String, Put> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Put call(String v) throws Exception {
+ String[] cells = v.split(",");
+ Put put = new Put(Bytes.toBytes(cells[0]));
+
+ put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
+ Bytes.toBytes(cells[3]));
+ return put;
+ }
+ }
+
+ @Test
+ public void testBulkDelete() throws IOException {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+
+ populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
+ new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
+
+
+
+ try (
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(TableName.valueOf(tableName))
+ ){
+ Result result1 = table.get(new Get(Bytes.toBytes("1")));
+ Assert.assertNull("Row 1 should had been deleted", result1.getRow());
+
+ Result result2 = table.get(new Get(Bytes.toBytes("2")));
+ Assert.assertNull("Row 2 should had been deleted", result2.getRow());
+
+ Result result3 = table.get(new Get(Bytes.toBytes("3")));
+ Assert.assertNull("Row 3 should had been deleted", result3.getRow());
+
+ Result result4 = table.get(new Get(Bytes.toBytes("4")));
+ Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
+
+ Result result5 = table.get(new Get(Bytes.toBytes("5")));
+ Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
+ }
+ }
+
+ @Test
+ public void testDistributedScan() throws IOException {
+ Configuration conf = htu.getConfiguration();
+
+ populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ Scan scan = new Scan();
+ scan.setCaching(100);
+
+ JavaRDD<String> javaRdd =
+ hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
+ .map(new ScanConvertFunction());
+
+ List<String> results = javaRdd.collect();
+
+ Assert.assertEquals(results.size(), 5);
+ }
+
+ private static class ScanConvertFunction implements
+ Function<Tuple2<ImmutableBytesWritable, Result>, String> {
+ @Override
+ public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
+ return Bytes.toString(v1._1().copyBytes());
+ }
+ }
+
+ @Test
+ public void testBulkGet() throws IOException {
+ List<byte[]> list = new ArrayList<>();
+ list.add(Bytes.toBytes("1"));
+ list.add(Bytes.toBytes("2"));
+ list.add(Bytes.toBytes("3"));
+ list.add(Bytes.toBytes("4"));
+ list.add(Bytes.toBytes("5"));
+
+ JavaRDD<byte[]> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+
+ populateTableWithMockData(conf, TableName.valueOf(tableName));
+
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ final JavaRDD<String> stringJavaRDD =
+ hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
+ new GetFunction(),
+ new ResultFunction());
+
+ Assert.assertEquals(stringJavaRDD.count(), 5);
+ }
+
+ public static class GetFunction implements Function<byte[], Get> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Get call(byte[] v) throws Exception {
+ return new Get(v);
+ }
+ }
+
+ public static class ResultFunction implements Function<Result, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public String call(Result result) throws Exception {
+ Iterator<Cell> it = result.listCells().iterator();
+ StringBuilder b = new StringBuilder();
+
+ b.append(Bytes.toString(result.getRow())).append(":");
+
+ while (it.hasNext()) {
+ Cell cell = it.next();
+ String q = Bytes.toString(CellUtil.cloneQualifier(cell));
+ if ("counter".equals(q)) {
+ b.append("(")
+ .append(q)
+ .append(",")
+ .append(Bytes.toLong(CellUtil.cloneValue(cell)))
+ .append(")");
+ } else {
+ b.append("(")
+ .append(q)
+ .append(",")
+ .append(Bytes.toString(CellUtil.cloneValue(cell)))
+ .append(")");
+ }
+ }
+ return b.toString();
+ }
+ }
+
+ private void populateTableWithMockData(Configuration conf, TableName tableName)
+ throws IOException {
+ try (
+ Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName)) {
+
+ List<Put> puts = new ArrayList<>();
+
+ for (int i = 1; i < 6; i++) {
+ Put put = new Put(Bytes.toBytes(Integer.toString(i)));
+ put.addColumn(columnFamily, columnFamily, columnFamily);
+ puts.add(put);
+ }
+ table.put(puts);
+ }
+ }
+
+}
\ No newline at end of file