You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Daniel Nuriyev (JIRA)" <ji...@apache.org> on 2017/03/20 18:11:42 UTC

[jira] [Created] (SPARK-20037) impossible to set kafka offsets using kafka 0.10 and spark 2.0.0

Daniel Nuriyev created SPARK-20037:
--------------------------------------

             Summary: impossible to set kafka offsets using kafka 0.10 and spark 2.0.0
                 Key: SPARK-20037
                 URL: https://issues.apache.org/jira/browse/SPARK-20037
             Project: Spark
          Issue Type: Bug
          Components: Input/Output
    Affects Versions: 2.0.0
            Reporter: Daniel Nuriyev
            Priority: Critical
             Fix For: 2.0.3


I use kafka 0.10.1 and java code with the following dependencies:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.0</version>
</dependency>

The code tries to read the a topic starting with offsets. 
The topic has 4 partitions that start somewhere before 3000000 and end after 3000000. So I wanted to read all partitions starting with 3000000

fromOffsets.put(new TopicPartition(topic, 0), 3000000L);
fromOffsets.put(new TopicPartition(topic, 1), 3000000L);
fromOffsets.put(new TopicPartition(topic, 2), 3000000L);
fromOffsets.put(new TopicPartition(topic, 3), 3000000L);

Using 5 second batches:
jssc = new JavaStreamingContext(conf, Durations.seconds(5));

The code immediately throws:
numRecords must not be negative

I use this as a base: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
But I use direct stream:
KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(
        topics, kafkaParams, fromOffsets
    )
)





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org