You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2017/08/01 11:39:00 UTC
[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't
guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108748#comment-16108748 ]
Nico Kruber commented on FLINK-6996:
------------------------------------
I got another incarnation (seen only once) with a different failure (only change in there is switching from {{HeapMemorySegment}} to {{HybridMemorySegment}} but since the memory type was not changed (still at on-heap by default) this should not be related.
{code}
09:02:49,616 ERROR org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase -
--------------------------------------------------------------------------------
Test testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase) failed with:
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44)
at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:198)
at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.log.LogCleaner.<init>(LogCleaner.scala:89)
at kafka.log.LogManager.<init>(LogManager.scala:72)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:648)
at kafka.server.KafkaServer.startup(KafkaServer.scala:208)
at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:433)
at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.restartBroker(KafkaTestEnvironmentImpl.java:181)
at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:282)
at org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:212)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
{code}
https://transfer.sh/H7pW5/369.3.tar.gz
> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --------------------------------------------------------------
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
> Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This means, when it's used like a "regular sink function" (option a from [the java doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) it will not flush the data on "snapshotState" as it is supposed to.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)