You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bhavya Raj Sharma (Jira)" <ji...@apache.org> on 2021/12/16 07:50:00 UTC

[jira] [Created] (SPARK-37660) Spark-3.2.0 Fetch Hbase Data not working

Bhavya Raj Sharma created SPARK-37660:
-----------------------------------------

             Summary: Spark-3.2.0 Fetch Hbase Data not working
                 Key: SPARK-37660
                 URL: https://issues.apache.org/jira/browse/SPARK-37660
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.0
         Environment: Hadoop version : hadoop-2.9.2

HBase version : hbase-2.2.5

Spark version : spark-3.2.0-bin-without-hadoop

java version : jdk1.8.0_151

scala version : scala-sdk-2.12.10

os version : Red Hat Enterprise Linux Server release 6.6 (Santiago)
            Reporter: Bhavya Raj Sharma


Below is the sample code snipet that is used to fetch data from hbase. This used to work fine with spark-3.1.1

However after upgrading to psark-3.2.0 it is not working, The issue is it is not throwing any exception, it just don't fill RDD.

 
{code:java}
 
   def getInfo(sc: SparkContext, startDate:String, cachingValue: Int, sparkLoggerParams: SparkLoggerParams, zkIP: String, zkPort: String): RDD[(String)] = {{
val scan = new Scan
    scan.addFamily("family")
    scan.addColumn("family","time")
    val rdd = getHbaseConfiguredRDDFromScan(sc, zkIP, zkPort, "myTable", scan, cachingValue, sparkLoggerParams)
    val output: RDD[(String)] = rdd.map { row =>
      (Bytes.toString(row._2.getRow))
    }
    output
  }
 
def getHbaseConfiguredRDDFromScan(sc: SparkContext, zkIP: String, zkPort: String, tableName: String,
                                    scan: Scan, cachingValue: Int, sparkLoggerParams: SparkLoggerParams): NewHadoopRDD[ImmutableBytesWritable, Result] = {
    scan.setCaching(cachingValue)
    val scanString = Base64.getEncoder.encodeToString(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(scan).toByteArray)
    val hbaseContext = new SparkHBaseContext(zkIP, zkPort)
    val hbaseConfig = hbaseContext.getConfiguration()
    hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableName)
    hbaseConfig.set(TableInputFormat.SCAN, scanString)
    sc.newAPIHadoopRDD(
      hbaseConfig,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]
    ).asInstanceOf[NewHadoopRDD[ImmutableBytesWritable, Result]]
  }
 
{code}
 

If we fetch with using scan directly without using newAPIHadoopRDD, it works.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org