You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/02/18 00:21:22 UTC

writing a Dataframe (with one of the columns as struct) into Kafka

Hello All,
I've a pyspark dataframe which i need to write to Kafka topic.

Structure of the DF is :

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = false)
 |    |-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

Currently, i'm looping over the rows, and adding the data in a hashmap,
and then using KafkaProducer to push data into Kafka topic.

This does not seem very efficient, since i'm looping over each row,
and using extra space as well.
What is the best way to design/code this ?

Current Code :

def writeCountToKafka(df):
       if df.count()>0:
          hm = {}
          df_pandas = df.toPandas()
          for _, row in df_pandas.iterrows():
               hm["window"] =
[datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
               hm["processedAlarmCnt"] = row["processedAlarmCnt"]
               hm["totalAlarmCnt"] = row["totalAlarmCnt"]

               # Python Kafka Producer
               kafka_producer.send(topic_count,
json.dumps(mymap).encode('utf-8'))
               kafka_producer.flush()


More details are in stackoverflow :

https://stackoverflow.com/questions/71166560/structured-streaming-writing-dataframe-into-kafka-row-by-row-dataframe-has-a

tia !