You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kuttaiah Robin <ku...@gmail.com> on 2018/11/01 06:15:44 UTC

How to use Dataset forEachPartion and groupByKey together

Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.


Step 1.
Read events from Kafka as shown below;
--------------------------------------
   Dataset<Row> kafkaEvents = getSparkSession().readStream().format("kafka")
          .option("kafka.bootstrap.servers", strKafkaAddress)
          .option("assign", strSubscription)
          .option("maxOffsetsPerTrigger", "100000")
          .option("startingOffsets", "latest")
          .option("failOnDataLoss", false)
          .load()
          .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
          .select("events.*");

I do groupByKey and then for each group, use those set of events obtained
per group, create JDBC connection/preparedStatement, insert and then close
connection.
Am using Oracle JDBC along with flatMapGroupsWithState.


Step 2.
Groupby and flatMapGroupwithState
---------------------------------
    Dataset<InsightEventUpdate>  sessionUpdates = kafkaEvents
       .groupByKey(
          new MapFunction<Row, String>() {
    @Override public String call(Row event) {
      return event.getAs(m_InsightRawEvent.getPrimaryKey());
    }
      }, Encoders.STRING())
      .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


This has a drawback where it creates connection, inserts into DB for each
group.

I need to do it for each partition so that only one connection and one
bacth insert can be done for all the new events which is read from the
partition.

Can somebody point me on how I can achieve this?

Basically am looking below;
1. Read from kafka as said above.
2. kafkaEvents.forEachPartion - Create one connection here.
3. Groupby and flatMapGroupwithState

thanks
Robin Kuttaiah