You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bhavya Raj Sharma (Jira)" <ji...@apache.org> on 2021/12/20 08:10:00 UTC
[jira] [Comment Edited] (SPARK-37660) Spark-3.2.0 Fetch Hbase Data not working
[ https://issues.apache.org/jira/browse/SPARK-37660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462441#comment-17462441 ]
Bhavya Raj Sharma edited comment on SPARK-37660 at 12/20/21, 8:09 AM:
----------------------------------------------------------------------
We ae not getting any exception, or output. Its just that it is returning empty RDD when we call it with spark version 3.2.0.
The same thing worked in 3.1.2
was (Author: JIRAUSER282028):
We ae not getting any exception, or output. Its just that it is returning empty RDD when we call it with spark version 3.2.0.
The same thing worked in 3.1.2 as well
> Spark-3.2.0 Fetch Hbase Data not working
> ----------------------------------------
>
> Key: SPARK-37660
> URL: https://issues.apache.org/jira/browse/SPARK-37660
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.2.0
> Environment: Hadoop version : hadoop-2.9.2
> HBase version : hbase-2.2.5
> Spark version : spark-3.2.0-bin-without-hadoop
> java version : jdk1.8.0_151
> scala version : scala-sdk-2.12.10
> os version : Red Hat Enterprise Linux Server release 6.6 (Santiago)
> Reporter: Bhavya Raj Sharma
> Priority: Major
>
> Below is the sample code snipet that is used to fetch data from hbase. This used to work fine with spark-3.1.1
> However after upgrading to psark-3.2.0 it is not working, The issue is it is not throwing any exception, it just don't fill RDD.
>
> {code:java}
>
> def getInfo(sc: SparkContext, startDate:String, cachingValue: Int, sparkLoggerParams: SparkLoggerParams, zkIP: String, zkPort: String): RDD[(String)] = {{
> val scan = new Scan
> scan.addFamily("family")
> scan.addColumn("family","time")
> val rdd = getHbaseConfiguredRDDFromScan(sc, zkIP, zkPort, "myTable", scan, cachingValue, sparkLoggerParams)
> val output: RDD[(String)] = rdd.map { row =>
> (Bytes.toString(row._2.getRow))
> }
> output
> }
>
> def getHbaseConfiguredRDDFromScan(sc: SparkContext, zkIP: String, zkPort: String, tableName: String,
> scan: Scan, cachingValue: Int, sparkLoggerParams: SparkLoggerParams): NewHadoopRDD[ImmutableBytesWritable, Result] = {
> scan.setCaching(cachingValue)
> val scanString = Base64.getEncoder.encodeToString(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(scan).toByteArray)
> val hbaseContext = new SparkHBaseContext(zkIP, zkPort)
> val hbaseConfig = hbaseContext.getConfiguration()
> hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableName)
> hbaseConfig.set(TableInputFormat.SCAN, scanString)
> sc.newAPIHadoopRDD(
> hbaseConfig,
> classOf[TableInputFormat],
> classOf[ImmutableBytesWritable], classOf[Result]
> ).asInstanceOf[NewHadoopRDD[ImmutableBytesWritable, Result]]
> }
>
> {code}
>
> If we fetch with using scan directly without using newAPIHadoopRDD, it works.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org