You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Puneet Tripathi <Pu...@dunnhumby.com> on 2016/07/21 10:46:49 UTC

writing Kafka dstream to local flat file

Hi, I am trying to consume from Kafka topics following http://spark.apache.org/docs/latest/streaming-kafka-integration.html Approach one(createStream). I am not able to write it to local text file using saveAsTextFiles() function. Below is the code

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)
zkQuorum, topic = 'localhost:9092', 'python-kafka'

kafka_stream = KafkaUtils.createStream(ssc, zkQuorum,None, {topic: 1})
lines = kafka_stream.map(lambda x: x[1])

kafka_stream.saveAsTextFiles('file:///home/puneett/')

When I access the consumer I get following output

[puneett@gb-slo-svb-0255 ~]$ /nfs/science/shared/kafka/kafka/bin/kafka-console-consumer.sh --topic python-kafka --property schema.registry.url="http://localhost:9092" --zookeeper localhost:2182 --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There

Please can someone suggest what am I doing wrong?
Regards,
Puneet
dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby.