You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2017/08/01 11:39:00 UTC

[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

    [ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108748#comment-16108748 ] 

Nico Kruber commented on FLINK-6996:
------------------------------------

I got another incarnation (seen only once) with a different failure (only change in there is switching from {{HeapMemorySegment}} to {{HybridMemorySegment}} but since the memory type was not changed (still at on-heap by default) this should not be related.

{code}
09:02:49,616 ERROR org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase  - 
--------------------------------------------------------------------------------
Test testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase) failed with:
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44)
	at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:198)
	at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
	at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.log.LogCleaner.<init>(LogCleaner.scala:89)
	at kafka.log.LogManager.<init>(LogManager.scala:72)
	at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:648)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:208)
	at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:433)
	at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.restartBroker(KafkaTestEnvironmentImpl.java:181)
	at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:282)
	at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:212)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
{code}

https://transfer.sh/H7pW5/369.3.tar.gz

> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --------------------------------------------------------------
>
>                 Key: FLINK-6996
>                 URL: https://issues.apache.org/jira/browse/FLINK-6996
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.2
>
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "regular sink function" (option a from [the java doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)