You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Harendra Singh (JIRA)" <ji...@apache.org> on 2017/11/03 09:51:00 UTC

[jira] [Created] (SPARK-22434) Spark structured streaming with HBase

Harendra Singh created SPARK-22434:
--------------------------------------

             Summary: Spark structured streaming with HBase
                 Key: SPARK-22434
                 URL: https://issues.apache.org/jira/browse/SPARK-22434
             Project: Spark
          Issue Type: Task
          Components: Structured Streaming
    Affects Versions: 2.1.2
            Reporter: Harendra Singh
            Priority: Blocker
             Fix For: 2.0.3


Hi Team,

We are doing streaming on kafka data which being collected from MySQL.

Now once all the analytics has been done i want to save my data directly to Hbase.

I have through the spark structured streaming document but couldn't find any sink with Hbase.  
====
Code Snip which i used to read the data from Kafka is below.
==

val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load

val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))

val uschema = StructType(Seq(
             StructField("MeterNumber", StringType, true),
            StructField("Utility", StringType, true),
            StructField("VendorServiceNumber", StringType, true),
            StructField("VendorName", StringType, true),
            StructField("SiteNumber",  StringType, true),
            StructField("SiteName", StringType, true),
            StructField("Location", StringType, true),
            StructField("timestamp", LongType, true),
            StructField("power", DoubleType, true)
            ))
val DF_Hbase = records.selectExpr("cast (value as string) as json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")
===
Now finally, i want to save DF_Hbase dataframe in hbase.

Please help me out....

Thanks 
Harendra Singh



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org