You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Muhammad Haseeb Javed <11...@seecs.edu.pk> on 2017/02/20 18:06:59 UTC

Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

I am PhD student at Ohio State working on a study to evaluate streaming
frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
benchmarks. But I think I am having a problem  with Spark. I have Spark
Streaming application which I am trying to run on a 5 node cluster
(including master). I have 2 zookeeper and 4 kafka brokers. However,
whenever I run a Spark Streaming application I encounter the following
error:

java.lang.IllegalArgumentException: requirement failed: numRecords
must not be negative
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)

The application starts fine, but as soon as the Kafka producers start
emitting the stream data I start receiving the aforementioned error
repeatedly.

I have tried removing Spark Streaming checkpointing files as has been
suggested in similar posts on the internet. However, the problem persists
even if I start a Kafka topic and its corresponding consumer Spark
Streaming application for the first time. Also the problem could not be
offset related as I start the topic for the first time.
Although the application seems to be processing the stream properly as I
can see by the benchmark numbers generated. However, the numbers are way of
from what I got for Storm and Flink, suspecting me to believe that there is
something wrong with the pipeline and Spark is not able to process the
stream as cleanly as it should. Any help in this regard would be really
appreciated.

Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Muhammad Haseeb Javed <11...@seecs.edu.pk>.
I was talking about the Kafka binary if using to run the Kafka server
(broker) with. The version for that binary is kafka_2.10-0.8.2.1 and Spark
is 2.0.2 is built with 2.11. So I am using the Kafka Connector that Spark
is using internally to communicate with the broker is also built with Scala
2.11. So can this version mismatch be the cause of the issue?

On Wed, Feb 22, 2017 at 8:44 PM, Cody Koeninger <co...@koeninger.org> wrote:

> If you're talking about the version of scala used to build the broker,
> that shouldn't matter.
> If you're talking about the version of scala used for the kafka client
> dependency, it shouldn't have compiled at all to begin with.
>
> On Wed, Feb 22, 2017 at 12:11 PM, Muhammad Haseeb Javed
> <11...@seecs.edu.pk> wrote:
> > I just noticed that Spark version that I am using (2.0.2) is built with
> > Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could
> this
> > be the reason why we are getting this error?
> >
> > On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> So there's no reason to use checkpointing at all, right?  Eliminate
> >> that as a possible source of problems.
> >>
> >> Probably unrelated, but this also isn't a very good way to benchmark.
> >> Kafka producers are threadsafe, there's no reason to create one for
> >> each partition.
> >>
> >> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
> >> <11...@seecs.edu.pk> wrote:
> >> > This is the code that I have been trying is giving me this error. No
> >> > complicated operation being performed on the topics as far as I can
> see.
> >> >
> >> > class Identity() extends BenchBase {
> >> >
> >> >
> >> >   override def process(lines: DStream[(Long, String)], config:
> >> > SparkBenchConfig): Unit = {
> >> >
> >> >     val reportTopic = config.reporterTopic
> >> >
> >> >     val brokerList = config.brokerList
> >> >
> >> >
> >> >     lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
> >> >
> >> >       val reporter = new KafkaReporter(reportTopic, brokerList)
> >> >
> >> >       partLines.foreach{ case (inTime , content) =>
> >> >
> >> >         val outTime = System.currentTimeMillis()
> >> >
> >> >         reporter.report(inTime, outTime)
> >> >
> >> >         if(config.debugMode) {
> >> >
> >> >           println("Event: " + inTime + ", " + outTime)
> >> >
> >> >         }
> >> >
> >> >       }
> >> >
> >> >     }))
> >> >
> >> >   }
> >> >
> >> > }
> >> >
> >> >
> >> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <co...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> That's an indication that the beginning offset for a given batch is
> >> >> higher than the ending offset, i.e. something is seriously wrong.
> >> >>
> >> >> Are you doing anything at all odd with topics, i.e. deleting and
> >> >> recreating them, using compacted topics, etc?
> >> >>
> >> >> Start off with a very basic stream over the same kafka topic that
> just
> >> >> does foreach println or similar, with no checkpointing at all, and
> get
> >> >> that working first.
> >> >>
> >> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
> >> >> <11...@seecs.edu.pk> wrote:
> >> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
> >> >> >
> >> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> >> >> > <11...@seecs.edu.pk> wrote:
> >> >> >>
> >> >> >> I am PhD student at Ohio State working on a study to evaluate
> >> >> >> streaming
> >> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
> >> >> >> HiBench
> >> >> >> benchmarks. But I think I am having a problem  with Spark. I have
> >> >> >> Spark
> >> >> >> Streaming application which I am trying to run on a 5 node cluster
> >> >> >> (including master). I have 2 zookeeper and 4 kafka brokers.
> However,
> >> >> >> whenever I run a Spark Streaming application I encounter the
> >> >> >> following
> >> >> >> error:
> >> >> >>
> >> >> >> java.lang.IllegalArgumentException: requirement failed:
> numRecords
> >> >> >> must
> >> >> >> not be negative
> >> >> >>         at scala.Predef$.require(Predef.scala:224)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<
> init>(InputInfoTracker.scala:38)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:165)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >> >>         at
> >> >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> >> >>         at
> >> >> >>
> >> >> >>
> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >> >> >>         at scala.Option.orElse(Option.scala:289)
> >> >> >>
> >> >> >> The application starts fine, but as soon as the Kafka producers
> >> >> >> start
> >> >> >> emitting the stream data I start receiving the aforementioned
> error
> >> >> >> repeatedly.
> >> >> >>
> >> >> >> I have tried removing Spark Streaming checkpointing files as has
> >> >> >> been
> >> >> >> suggested in similar posts on the internet. However, the problem
> >> >> >> persists
> >> >> >> even if I start a Kafka topic and its corresponding consumer Spark
> >> >> >> Streaming
> >> >> >> application for the first time. Also the problem could not be
> offset
> >> >> >> related
> >> >> >> as I start the topic for the first time.
> >> >> >>
> >> >> >> Although the application seems to be processing the stream
> properly
> >> >> >> as
> >> >> >> I
> >> >> >> can see by the benchmark numbers generated. However, the numbers
> are
> >> >> >> way of
> >> >> >> from what I got for Storm and Flink, suspecting me to believe that
> >> >> >> there is
> >> >> >> something wrong with the pipeline and Spark is not able to process
> >> >> >> the
> >> >> >> stream as cleanly as it should. Any help in this regard would be
> >> >> >> really
> >> >> >> appreciated.
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Cody Koeninger <co...@koeninger.org>.
If you're talking about the version of scala used to build the broker,
that shouldn't matter.
If you're talking about the version of scala used for the kafka client
dependency, it shouldn't have compiled at all to begin with.

On Wed, Feb 22, 2017 at 12:11 PM, Muhammad Haseeb Javed
<11...@seecs.edu.pk> wrote:
> I just noticed that Spark version that I am using (2.0.2) is built with
> Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could this
> be the reason why we are getting this error?
>
> On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> So there's no reason to use checkpointing at all, right?  Eliminate
>> that as a possible source of problems.
>>
>> Probably unrelated, but this also isn't a very good way to benchmark.
>> Kafka producers are threadsafe, there's no reason to create one for
>> each partition.
>>
>> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
>> <11...@seecs.edu.pk> wrote:
>> > This is the code that I have been trying is giving me this error. No
>> > complicated operation being performed on the topics as far as I can see.
>> >
>> > class Identity() extends BenchBase {
>> >
>> >
>> >   override def process(lines: DStream[(Long, String)], config:
>> > SparkBenchConfig): Unit = {
>> >
>> >     val reportTopic = config.reporterTopic
>> >
>> >     val brokerList = config.brokerList
>> >
>> >
>> >     lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
>> >
>> >       val reporter = new KafkaReporter(reportTopic, brokerList)
>> >
>> >       partLines.foreach{ case (inTime , content) =>
>> >
>> >         val outTime = System.currentTimeMillis()
>> >
>> >         reporter.report(inTime, outTime)
>> >
>> >         if(config.debugMode) {
>> >
>> >           println("Event: " + inTime + ", " + outTime)
>> >
>> >         }
>> >
>> >       }
>> >
>> >     }))
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >>
>> >> That's an indication that the beginning offset for a given batch is
>> >> higher than the ending offset, i.e. something is seriously wrong.
>> >>
>> >> Are you doing anything at all odd with topics, i.e. deleting and
>> >> recreating them, using compacted topics, etc?
>> >>
>> >> Start off with a very basic stream over the same kafka topic that just
>> >> does foreach println or similar, with no checkpointing at all, and get
>> >> that working first.
>> >>
>> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
>> >> <11...@seecs.edu.pk> wrote:
>> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>> >> >
>> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
>> >> > <11...@seecs.edu.pk> wrote:
>> >> >>
>> >> >> I am PhD student at Ohio State working on a study to evaluate
>> >> >> streaming
>> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
>> >> >> HiBench
>> >> >> benchmarks. But I think I am having a problem  with Spark. I have
>> >> >> Spark
>> >> >> Streaming application which I am trying to run on a 5 node cluster
>> >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> >> >> whenever I run a Spark Streaming application I encounter the
>> >> >> following
>> >> >> error:
>> >> >>
>> >> >> java.lang.IllegalArgumentException: requirement failed: numRecords
>> >> >> must
>> >> >> not be negative
>> >> >>         at scala.Predef$.require(Predef.scala:224)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >> >>         at
>> >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> >> >>         at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> >> >>         at scala.Option.orElse(Option.scala:289)
>> >> >>
>> >> >> The application starts fine, but as soon as the Kafka producers
>> >> >> start
>> >> >> emitting the stream data I start receiving the aforementioned error
>> >> >> repeatedly.
>> >> >>
>> >> >> I have tried removing Spark Streaming checkpointing files as has
>> >> >> been
>> >> >> suggested in similar posts on the internet. However, the problem
>> >> >> persists
>> >> >> even if I start a Kafka topic and its corresponding consumer Spark
>> >> >> Streaming
>> >> >> application for the first time. Also the problem could not be offset
>> >> >> related
>> >> >> as I start the topic for the first time.
>> >> >>
>> >> >> Although the application seems to be processing the stream properly
>> >> >> as
>> >> >> I
>> >> >> can see by the benchmark numbers generated. However, the numbers are
>> >> >> way of
>> >> >> from what I got for Storm and Flink, suspecting me to believe that
>> >> >> there is
>> >> >> something wrong with the pipeline and Spark is not able to process
>> >> >> the
>> >> >> stream as cleanly as it should. Any help in this regard would be
>> >> >> really
>> >> >> appreciated.
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Muhammad Haseeb Javed <11...@seecs.edu.pk>.
I just noticed that Spark version that I am using (2.0.2) is built with
Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could
this be the reason why we are getting this error?

On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger <co...@koeninger.org> wrote:

> So there's no reason to use checkpointing at all, right?  Eliminate
> that as a possible source of problems.
>
> Probably unrelated, but this also isn't a very good way to benchmark.
> Kafka producers are threadsafe, there's no reason to create one for
> each partition.
>
> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
> <11...@seecs.edu.pk> wrote:
> > This is the code that I have been trying is giving me this error. No
> > complicated operation being performed on the topics as far as I can see.
> >
> > class Identity() extends BenchBase {
> >
> >
> >   override def process(lines: DStream[(Long, String)], config:
> > SparkBenchConfig): Unit = {
> >
> >     val reportTopic = config.reporterTopic
> >
> >     val brokerList = config.brokerList
> >
> >
> >     lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
> >
> >       val reporter = new KafkaReporter(reportTopic, brokerList)
> >
> >       partLines.foreach{ case (inTime , content) =>
> >
> >         val outTime = System.currentTimeMillis()
> >
> >         reporter.report(inTime, outTime)
> >
> >         if(config.debugMode) {
> >
> >           println("Event: " + inTime + ", " + outTime)
> >
> >         }
> >
> >       }
> >
> >     }))
> >
> >   }
> >
> > }
> >
> >
> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> That's an indication that the beginning offset for a given batch is
> >> higher than the ending offset, i.e. something is seriously wrong.
> >>
> >> Are you doing anything at all odd with topics, i.e. deleting and
> >> recreating them, using compacted topics, etc?
> >>
> >> Start off with a very basic stream over the same kafka topic that just
> >> does foreach println or similar, with no checkpointing at all, and get
> >> that working first.
> >>
> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
> >> <11...@seecs.edu.pk> wrote:
> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
> >> >
> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> >> > <11...@seecs.edu.pk> wrote:
> >> >>
> >> >> I am PhD student at Ohio State working on a study to evaluate
> streaming
> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
> HiBench
> >> >> benchmarks. But I think I am having a problem  with Spark. I have
> Spark
> >> >> Streaming application which I am trying to run on a 5 node cluster
> >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
> >> >> whenever I run a Spark Streaming application I encounter the
> following
> >> >> error:
> >> >>
> >> >> java.lang.IllegalArgumentException: requirement failed: numRecords
> must
> >> >> not be negative
> >> >>         at scala.Predef$.require(Predef.scala:224)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<
> init>(InputInfoTracker.scala:38)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:165)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >>         at
> >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >> >>         at scala.Option.orElse(Option.scala:289)
> >> >>
> >> >> The application starts fine, but as soon as the Kafka producers start
> >> >> emitting the stream data I start receiving the aforementioned error
> >> >> repeatedly.
> >> >>
> >> >> I have tried removing Spark Streaming checkpointing files as has been
> >> >> suggested in similar posts on the internet. However, the problem
> >> >> persists
> >> >> even if I start a Kafka topic and its corresponding consumer Spark
> >> >> Streaming
> >> >> application for the first time. Also the problem could not be offset
> >> >> related
> >> >> as I start the topic for the first time.
> >> >>
> >> >> Although the application seems to be processing the stream properly
> as
> >> >> I
> >> >> can see by the benchmark numbers generated. However, the numbers are
> >> >> way of
> >> >> from what I got for Storm and Flink, suspecting me to believe that
> >> >> there is
> >> >> something wrong with the pipeline and Spark is not able to process
> the
> >> >> stream as cleanly as it should. Any help in this regard would be
> really
> >> >> appreciated.
> >> >
> >> >
> >
> >
>

Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Cody Koeninger <co...@koeninger.org>.
So there's no reason to use checkpointing at all, right?  Eliminate
that as a possible source of problems.

Probably unrelated, but this also isn't a very good way to benchmark.
Kafka producers are threadsafe, there's no reason to create one for
each partition.

On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
<11...@seecs.edu.pk> wrote:
> This is the code that I have been trying is giving me this error. No
> complicated operation being performed on the topics as far as I can see.
>
> class Identity() extends BenchBase {
>
>
>   override def process(lines: DStream[(Long, String)], config:
> SparkBenchConfig): Unit = {
>
>     val reportTopic = config.reporterTopic
>
>     val brokerList = config.brokerList
>
>
>     lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
>
>       val reporter = new KafkaReporter(reportTopic, brokerList)
>
>       partLines.foreach{ case (inTime , content) =>
>
>         val outTime = System.currentTimeMillis()
>
>         reporter.report(inTime, outTime)
>
>         if(config.debugMode) {
>
>           println("Event: " + inTime + ", " + outTime)
>
>         }
>
>       }
>
>     }))
>
>   }
>
> }
>
>
> On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> That's an indication that the beginning offset for a given batch is
>> higher than the ending offset, i.e. something is seriously wrong.
>>
>> Are you doing anything at all odd with topics, i.e. deleting and
>> recreating them, using compacted topics, etc?
>>
>> Start off with a very basic stream over the same kafka topic that just
>> does foreach println or similar, with no checkpointing at all, and get
>> that working first.
>>
>> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
>> <11...@seecs.edu.pk> wrote:
>> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>> >
>> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
>> > <11...@seecs.edu.pk> wrote:
>> >>
>> >> I am PhD student at Ohio State working on a study to evaluate streaming
>> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
>> >> benchmarks. But I think I am having a problem  with Spark. I have Spark
>> >> Streaming application which I am trying to run on a 5 node cluster
>> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> >> whenever I run a Spark Streaming application I encounter the following
>> >> error:
>> >>
>> >> java.lang.IllegalArgumentException: requirement failed: numRecords must
>> >> not be negative
>> >>         at scala.Predef$.require(Predef.scala:224)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >>         at
>> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> >>         at scala.Option.orElse(Option.scala:289)
>> >>
>> >> The application starts fine, but as soon as the Kafka producers start
>> >> emitting the stream data I start receiving the aforementioned error
>> >> repeatedly.
>> >>
>> >> I have tried removing Spark Streaming checkpointing files as has been
>> >> suggested in similar posts on the internet. However, the problem
>> >> persists
>> >> even if I start a Kafka topic and its corresponding consumer Spark
>> >> Streaming
>> >> application for the first time. Also the problem could not be offset
>> >> related
>> >> as I start the topic for the first time.
>> >>
>> >> Although the application seems to be processing the stream properly as
>> >> I
>> >> can see by the benchmark numbers generated. However, the numbers are
>> >> way of
>> >> from what I got for Storm and Flink, suspecting me to believe that
>> >> there is
>> >> something wrong with the pipeline and Spark is not able to process the
>> >> stream as cleanly as it should. Any help in this regard would be really
>> >> appreciated.
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Muhammad Haseeb Javed <11...@seecs.edu.pk>.
This is the code that I have been trying is giving me this error. No
complicated operation being performed on the topics as far as I can see.

class Identity() extends BenchBase {


  override def process(lines: DStream[(Long, String)], config:
SparkBenchConfig): Unit = {

    val reportTopic = config.reporterTopic

    val brokerList = config.brokerList


    lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {

      val reporter = new KafkaReporter(reportTopic, brokerList)

      partLines.foreach{ case (inTime , content) =>

        val outTime = System.currentTimeMillis()

        reporter.report(inTime, outTime)

        if(config.debugMode) {

          println("Event: " + inTime + ", " + outTime)

        }

      }

    }))

  }

}

On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <co...@koeninger.org> wrote:

> That's an indication that the beginning offset for a given batch is
> higher than the ending offset, i.e. something is seriously wrong.
>
> Are you doing anything at all odd with topics, i.e. deleting and
> recreating them, using compacted topics, etc?
>
> Start off with a very basic stream over the same kafka topic that just
> does foreach println or similar, with no checkpointing at all, and get
> that working first.
>
> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
> <11...@seecs.edu.pk> wrote:
> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
> >
> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> > <11...@seecs.edu.pk> wrote:
> >>
> >> I am PhD student at Ohio State working on a study to evaluate streaming
> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
> >> benchmarks. But I think I am having a problem  with Spark. I have Spark
> >> Streaming application which I am trying to run on a 5 node cluster
> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
> >> whenever I run a Spark Streaming application I encounter the following
> >> error:
> >>
> >> java.lang.IllegalArgumentException: requirement failed: numRecords must
> >> not be negative
> >>         at scala.Predef$.require(Predef.scala:224)
> >>         at
> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<
> init>(InputInfoTracker.scala:38)
> >>         at
> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:165)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >>         at scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:58)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >>         at
> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >>         at scala.Option.orElse(Option.scala:289)
> >>
> >> The application starts fine, but as soon as the Kafka producers start
> >> emitting the stream data I start receiving the aforementioned error
> >> repeatedly.
> >>
> >> I have tried removing Spark Streaming checkpointing files as has been
> >> suggested in similar posts on the internet. However, the problem
> persists
> >> even if I start a Kafka topic and its corresponding consumer Spark
> Streaming
> >> application for the first time. Also the problem could not be offset
> related
> >> as I start the topic for the first time.
> >>
> >> Although the application seems to be processing the stream properly as I
> >> can see by the benchmark numbers generated. However, the numbers are
> way of
> >> from what I got for Storm and Flink, suspecting me to believe that
> there is
> >> something wrong with the pipeline and Spark is not able to process the
> >> stream as cleanly as it should. Any help in this regard would be really
> >> appreciated.
> >
> >
>

Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Cody Koeninger <co...@koeninger.org>.
That's an indication that the beginning offset for a given batch is
higher than the ending offset, i.e. something is seriously wrong.

Are you doing anything at all odd with topics, i.e. deleting and
recreating them, using compacted topics, etc?

Start off with a very basic stream over the same kafka topic that just
does foreach println or similar, with no checkpointing at all, and get
that working first.

On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
<11...@seecs.edu.pk> wrote:
> Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>
> On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> <11...@seecs.edu.pk> wrote:
>>
>> I am PhD student at Ohio State working on a study to evaluate streaming
>> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
>> benchmarks. But I think I am having a problem  with Spark. I have Spark
>> Streaming application which I am trying to run on a 5 node cluster
>> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> whenever I run a Spark Streaming application I encounter the following
>> error:
>>
>> java.lang.IllegalArgumentException: requirement failed: numRecords must
>> not be negative
>>         at scala.Predef$.require(Predef.scala:224)
>>         at
>> org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
>>         at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>>         at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>>         at scala.Option.orElse(Option.scala:289)
>>
>> The application starts fine, but as soon as the Kafka producers start
>> emitting the stream data I start receiving the aforementioned error
>> repeatedly.
>>
>> I have tried removing Spark Streaming checkpointing files as has been
>> suggested in similar posts on the internet. However, the problem persists
>> even if I start a Kafka topic and its corresponding consumer Spark Streaming
>> application for the first time. Also the problem could not be offset related
>> as I start the topic for the first time.
>>
>> Although the application seems to be processing the stream properly as I
>> can see by the benchmark numbers generated. However, the numbers are way of
>> from what I got for Storm and Flink, suspecting me to believe that there is
>> something wrong with the pipeline and Spark is not able to process the
>> stream as cleanly as it should. Any help in this regard would be really
>> appreciated.
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

Posted by Muhammad Haseeb Javed <11...@seecs.edu.pk>.
Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10

On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed <
11besemjaved@seecs.edu.pk> wrote:

> I am PhD student at Ohio State working on a study to evaluate streaming
> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
> benchmarks. But I think I am having a problem  with Spark. I have Spark
> Streaming application which I am trying to run on a 5 node cluster
> (including master). I have 2 zookeeper and 4 kafka brokers. However,
> whenever I run a Spark Streaming application I encounter the following
> error:
>
> java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
>         at scala.Predef$.require(Predef.scala:224)
>         at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
>         at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>         at scala.Option.orElse(Option.scala:289)
>
> The application starts fine, but as soon as the Kafka producers start
> emitting the stream data I start receiving the aforementioned error
> repeatedly.
>
> I have tried removing Spark Streaming checkpointing files as has been
> suggested in similar posts on the internet. However, the problem persists
> even if I start a Kafka topic and its corresponding consumer Spark
> Streaming application for the first time. Also the problem could not be
> offset related as I start the topic for the first time.
> Although the application seems to be processing the stream properly as I
> can see by the benchmark numbers generated. However, the numbers are way of
> from what I got for Storm and Flink, suspecting me to believe that there is
> something wrong with the pipeline and Spark is not able to process the
> stream as cleanly as it should. Any help in this regard would be really
> appreciated.
>