You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Liu Bo <di...@gmail.com> on 2016/09/08 07:57:59 UTC

How to handling RecordTooLargeException while sending message to kafka system at 0.10.1

Hi group

We run into RecordTooLargeException while sending message to outgoing kafka
system. We solved this at samza 0.10.0. Now it's back in 0.10.1... Here's
the detail.

We are analyzing crawled web pages, some message will grow beyond the
max.request.size after adding new features to them. Our current strategy is
simply throw them away

We have several configurable serialize strategies, which make it not easy
to count exact size of the message before finally deserialize them to byte
array.

What we do at 0.10.0 is catch the processing exception, and then check if
it's caused by RecordTooLargeException. If so our task will hide the
exception and continue processing.

This way seems not work at 0.10.1 version. Our task could detect catch the
exception and ignore it at task process. But it seems that task commit will
re-flush the large message again and cause container failure.

It seems that TaskIntance.commit is beyond the control of our task, as
nothing from our CppTask is listed at the stacktrace.

Is there any better way to handle RecordTooLargeException? Your help is
much of my appreciation.

The stack trace of first catching the exception in our CppTask:

2016-09-07 23:03:50.283 [main] CppTask [ERROR] error processing message at
partition : SystemStreamPartition [message, documents_nlp, 3], offset:
3166714
2016-09-07 23:03:50.283 [main] CppTask [ERROR] CAUSE: Unable to send
message from TaskName-Partition 3 to system message.

org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 3 to system message.
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:165)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149)
        at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:149)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:64)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:64)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.stop(KafkaSystemProducer.scala:64)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:130)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:125)
        at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:100)
        at
org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
        at
org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
        *at com.xxx.cpp.samza.task.CppTask.process(CppTask.java:155)*
        at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:150)
        at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
       * at
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:149)*
        at
org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$process$1$$anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:133)
        at
org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$process$1$$anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:130)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$process$1.apply$mcVJ$sp(RunLoop.scala:129)
        at
org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51)
        at
org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:37)
        at org.apache.samza.container.RunLoop.org
$apache$samza$container$RunLoop$$process(RunLoop.scala:121)
        at org.apache.samza.container.RunLoop$$anon$2.run(RunLoop.scala:78)
        at
org.apache.samza.util.ThrottlingExecutor.execute(ThrottlingExecutor.java:64)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:88)
        at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:594)
        at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82)
        at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56)
        at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
message is 1214147 bytes when serialized which is larger than the maximum
request size you have configured with the max.request.size configuration.

The stack trace of actually cause the container fail:

2016-09-07 23:03:50.287 [main] SamzaContainer [ERROR] Caught exception in
process loop.
org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 3 to system message.
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:165)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149)
        at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:149)
        at
org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
        at
org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at
org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
        at
org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:70)
       * at
org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182)*
        at
org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:173)
        at
org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:173)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at
org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$commit$1.apply$mcVJ$sp(RunLoop.scala:173)
        at
org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51)
        at
org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:37)
        at org.apache.samza.container.RunLoop.org
$apache$samza$container$RunLoop$$commit(RunLoop.scala:168)
        at org.apache.samza.container.RunLoop$$anon$2.run(RunLoop.scala:80)
        at
org.apache.samza.util.ThrottlingExecutor.execute(ThrottlingExecutor.java:64)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:88)
        at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:594)
        at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82)
        at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56)
        at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
message is 1214147 bytes when serialized which is larger than the maximum
request size you have configured with the max.request.size configuration.
2016-09-07 23:03:50.288 [main] SamzaContainer [INFO] Shutting down.
2016-09-07 23:03:50.289 [main] SamzaContainer [INFO] Shutting down consumer
multiplexer.

-- 
All the best

Liu Bo