You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2015/09/22 17:32:05 UTC

[jira] [Created] (FLINK-2735) KafkaProducerITCase.testCustomPartitioning sporadically fails

Robert Metzger created FLINK-2735:
-------------------------------------

             Summary: KafkaProducerITCase.testCustomPartitioning sporadically fails
                 Key: FLINK-2735
                 URL: https://issues.apache.org/jira/browse/FLINK-2735
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 0.10
            Reporter: Robert Metzger


In the following test run: https://s3.amazonaws.com/archive.travis-ci.org/jobs/81584444/log.txt
there was the following failure

{code}
Caused by: java.lang.Exception: Unable to get last offset for topic customPartitioningTestTopic and partitions [FetchPartition {partition=2, offset=-915623761776}]. 
Exception for partition 2: kafka.common.UnknownException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
	at java.lang.Class.newInstance(Class.java:438)
	at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
	at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)

	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Unable to get last offset for topic customPartitioningTestTopic and partitions [FetchPartition {partition=2, offset=-915623761776}]. 
Exception for partition 2: kafka.common.UnknownException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
	at java.lang.Class.newInstance(Class.java:438)
	at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
	at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)

	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:524)
	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 17.455 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
testCustomPartitioning(org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase)  Time elapsed: 7.809 sec  <<< FAILURE!
java.lang.AssertionError: Test failed: The program execution failed: Job execution failed.
	at org.junit.Assert.fail(Assert.java:88)
	at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.tryExecute(KafkaTestBase.java:313)
	at org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase.testCustomPartitioning(KafkaProducerITCase.java:155)
{code}

>From the broker logs it seems to be an issue in the Kafka broker

{code}
14:43:03,328 INFO  kafka.network.Processor                                       - Closing socket connection to /127.0.0.1.
14:43:03,334 WARN  kafka.server.KafkaApis                                        - [KafkaApi-0] Error while responding to offset request
java.lang.ArrayIndexOutOfBoundsException: 1
	at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:469)
	at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:449)
	at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:411)
	at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:402)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:402)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:61)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
	at java.lang.Thread.run(Thread.java:745)
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)