You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/09/06 14:47:22 UTC

[jira] [Resolved] (SPARK-17411) Cannot set fromOffsets in createDirectStream function

     [ https://issues.apache.org/jira/browse/SPARK-17411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-17411.
-------------------------------
    Resolution: Duplicate

> Cannot set fromOffsets in createDirectStream function
> -----------------------------------------------------
>
>                 Key: SPARK-17411
>                 URL: https://issues.apache.org/jira/browse/SPARK-17411
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>    Affects Versions: 2.0.0
>            Reporter: Piotr Milanowski
>
> I am trying to create a kafka direct stream with a custom starting offset:
> {code}
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
> streamin_ctx = StreamingContext(sc, 300)
> topic = TopicAndPartition("my-kafka-topic", 1)
> d_stream = KafkaUtils.createDirectStream(streaming_ctx, ["my-kafka-topic"], {"metadata.broker.list": "kafka.server.com:9092"}, fromOffsets={topic: 123445})
> {code}
> This code snippet, run in pyspark (with a proper topic name and server localisation, obviously) returns a casting error due to the _fromOffset_ variable.
> {code}
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "spark-2.0/python/pyspark/streaming/kafka.py", line 130, in createDirectStream
>     ssc._jssc, kafkaParams, set(topics), jfromOffsets)
>   File "spark-2.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
>   File "spark-2.0/python/pyspark/sql/utils.py", line 63, in deco
>     return f(*a, **kw)
>   File "spark-2.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o57.createDirectStreamWithoutMessageHandler.
> : java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
> 	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper$$anonfun$17.apply(KafkaUtils.scala:717)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
> 	at scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
> 	at scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
> 	at scala.collection.AbstractMap.toBuffer(Map.scala:59)
> 	at scala.collection.MapLike$class.toSeq(MapLike.scala:323)
> 	at scala.collection.AbstractMap.toSeq(Map.scala:59)
> 	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:717)
> 	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 	at py4j.Gateway.invoke(Gateway.java:280)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.GatewayConnection.run(GatewayConnection.java:211)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> Either this is a bug, or the documentation of kafka streaming API can be a little more precise on what can be used as _fromOffsets_.
> What should be used as _fromOffsets_?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org