You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by 舒琦 <sh...@eefung.com> on 2017/02/23 08:27:30 UTC

Exceed max.request.size of Kafka

Hi,

	Sometimes there are huge size of data will occur in our flow, like 2MB, now samza will catch exception and shutdown like belowing.But what I want is I can handle such specific exception and just discard such data and the flow continues.

2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR] Uncaught exception in thread (name=main). Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 0 to system kafka.
	at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133)
	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:661)
	at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:115)
	at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:89)
	at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 0 to system kafka.
	at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:177)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:350)
	at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:162)
	at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
	at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:60)
	at com.antfact.datacenter.canal.task.tags.DocumentTagTask.process(DocumentTagTask.java:127)
	at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
	at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
	at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:157)
	at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
	at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:155)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:356)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:325)
	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:283)
	at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:199)
	at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144)
	... 4 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 881729 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down.
2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down consumer multiplexer.
2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down BrokerProxy for 172.19.105.20:9096
2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple consumer...
2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy [INFO] Got interrupt exception in broker proxy thread.
2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down BrokerProxy for 172.19.105.22:9096
2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple consumer...
2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy [INFO] Got interrupt exception in broker proxy thread.
2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task instance stream tasks.
2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task instance stores.
2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host statistics monitor.
2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down producer multiplexer.
2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down locality manager.
2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO] Stopping coordinator stream producer.
2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset manager.
2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics reporters.
2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping producer.
2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping reporter timer.
2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM metrics.
2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete.

Thanks!

————————
QiShu


Re: Exceed max.request.size of Kafka

Posted by 舒琦 <sh...@eefung.com>.
Hi Jagadish,

The 2 methods you provided I had tried already yesterday, then I found using the wrong Exception type, since I change to SamzaException it worked.

Thanks for you help and explanation!

————————
QiShu

> 在 2017年2月23日,23:47,Jagadish Venkatraman <ja...@gmail.com> 写道:
> 
> Hi QiShu,
> 
> 1.  I see the exception occurring in your *process* method. It seems that
> the size of the message you are trying to send is larger than 1M (the
> maximum kafka message size). You can choose to catch the exception in your
> process method and move on. Would n't that work for you?
> 
> {code}
> 
> public DocumentTagTask implements StreamTask {
>  public void process(..,..,..) {
>     //custom logic
>     // try {
>             //logic that could potentially throw an exception
>             //collector.send(msg)
>        } catch(Exception e) {
>             //handle exception and move on.
>       }
>  }
> }
> 
> {code}
> 
> 2. Alternately, you may want to look at the following properties in the Samza
> config table
> <https://samza.apache.org/learn/documentation/0.12/jobs/configuration-table.html>.
> (if you want a config driven approach)
> 
> task.ignored.exceptions This property specifies which exceptions should be
> ignored if thrown in a task's process or window methods. The exceptions to
> be ignored should be a comma-separated list of fully-qualified class names
> of the exceptions or * to ignore all exceptions.
> task.drop.deserialization.errors This property is to define how the system
> deals with deserialization failure situation. If set to true, the system
> will skip the error messages and keep running. If set to false, the system
> with throw exceptions and fail the container. Default is false.
> task.drop.serialization.errors This property is to define how the system
> deals with serialization failure situation. If set to true, the system will
> drop the error messages and keep running. If set to false, the system with
> throw exceptions and fail the container. Default is false.
> 
> Thanks,
> Jagadish
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Thu, Feb 23, 2017 at 12:27 AM, 舒琦 <sh...@eefung.com> wrote:
> 
>> Hi,
>> 
>>        Sometimes there are huge size of data will occur in our flow, like
>> 2MB, now samza will catch exception and shutdown like belowing.But what I
>> want is I can handle such specific exception and just discard such data and
>> the flow continues.
>> 
>> 2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR]
>> Uncaught exception in thread (name=main). Exiting process now.
>> org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable
>> to send message from TaskName-Partition 0 to system kafka.
>>        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133)
>>        at org.apache.samza.container.SamzaContainer.run(
>> SamzaContainer.scala:661)
>>        at org.apache.samza.container.SamzaContainer$.safeMain(
>> SamzaContainer.scala:115)
>>        at org.apache.samza.container.SamzaContainer$.main(
>> SamzaContainer.scala:89)
>>        at org.apache.samza.container.SamzaContainer.main(
>> SamzaContainer.scala)
>> Caused by: org.apache.samza.SamzaException: Unable to send message from
>> TaskName-Partition 0 to system kafka.
>>        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.
>> onCompletion(KafkaSystemProducer.scala:177)
>>        at org.apache.kafka.clients.producer.KafkaProducer.send(
>> KafkaProducer.java:350)
>>        at org.apache.samza.system.kafka.KafkaSystemProducer.send(
>> KafkaSystemProducer.scala:162)
>>        at org.apache.samza.system.SystemProducers.send(
>> SystemProducers.scala:87)
>>        at org.apache.samza.task.TaskInstanceCollector.send(
>> TaskInstanceCollector.scala:60)
>>        at com.antfact.datacenter.canal.task.tags.DocumentTagTask.
>> process(DocumentTagTask.java:127)
>>        at org.apache.samza.task.AsyncStreamTaskAdapter.process(
>> AsyncStreamTaskAdapter.java:72)
>>        at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(
>> AsyncStreamTaskAdapter.java:63)
>>        at org.apache.samza.container.TaskInstance$$anonfun$process$
>> 1.apply$mcV$sp(TaskInstance.scala:157)
>>        at org.apache.samza.container.TaskInstanceExceptionHandler.
>> maybeHandle(TaskInstanceExceptionHandler.scala:54)
>>        at org.apache.samza.container.TaskInstance.process(
>> TaskInstance.scala:155)
>>        at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
>> process(AsyncRunLoop.java:356)
>>        at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
>> run(AsyncRunLoop.java:325)
>>        at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
>> access$300(AsyncRunLoop.java:283)
>>        at org.apache.samza.task.AsyncRunLoop.runTasks(
>> AsyncRunLoop.java:199)
>>        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144)
>>        ... 4 more
>> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
>> message is 881729 bytes when serialized which is larger than the maximum
>> request size you have configured with the max.request.size configuration.
>> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down.
>> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down
>> consumer multiplexer.
>> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down
>> BrokerProxy for 172.19.105.20:9096
>> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple
>> consumer...
>> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
>> 172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy
>> [INFO] Got interrupt exception in broker proxy thread.
>> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down
>> BrokerProxy for 172.19.105.22:9096
>> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple
>> consumer...
>> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
>> 172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy
>> [INFO] Got interrupt exception in broker proxy thread.
>> 2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task
>> instance stream tasks.
>> 2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task
>> instance stores.
>> 2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host
>> statistics monitor.
>> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
>> producer multiplexer.
>> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
>> locality manager.
>> 2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO]
>> Stopping coordinator stream producer.
>> 2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset
>> manager.
>> 2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics
>> reporters.
>> 2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping
>> producer.
>> 2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping
>> reporter timer.
>> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM
>> metrics.
>> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete.
>> 
>> Thanks!
>> 
>> ————————
>> QiShu
>> 
>> 
> 
> 
> -- 
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University


Re: Exceed max.request.size of Kafka

Posted by Jagadish Venkatraman <ja...@gmail.com>.
Hi QiShu,

1.  I see the exception occurring in your *process* method. It seems that
the size of the message you are trying to send is larger than 1M (the
maximum kafka message size). You can choose to catch the exception in your
process method and move on. Would n't that work for you?

{code}

public DocumentTagTask implements StreamTask {
  public void process(..,..,..) {
     //custom logic
     // try {
             //logic that could potentially throw an exception
             //collector.send(msg)
        } catch(Exception e) {
             //handle exception and move on.
       }
  }
}

{code}

2. Alternately, you may want to look at the following properties in the Samza
config table
<https://samza.apache.org/learn/documentation/0.12/jobs/configuration-table.html>.
(if you want a config driven approach)

task.ignored.exceptions This property specifies which exceptions should be
ignored if thrown in a task's process or window methods. The exceptions to
be ignored should be a comma-separated list of fully-qualified class names
of the exceptions or * to ignore all exceptions.
task.drop.deserialization.errors This property is to define how the system
deals with deserialization failure situation. If set to true, the system
will skip the error messages and keep running. If set to false, the system
with throw exceptions and fail the container. Default is false.
task.drop.serialization.errors This property is to define how the system
deals with serialization failure situation. If set to true, the system will
drop the error messages and keep running. If set to false, the system with
throw exceptions and fail the container. Default is false.

Thanks,
Jagadish









On Thu, Feb 23, 2017 at 12:27 AM, 舒琦 <sh...@eefung.com> wrote:

> Hi,
>
>         Sometimes there are huge size of data will occur in our flow, like
> 2MB, now samza will catch exception and shutdown like belowing.But what I
> want is I can handle such specific exception and just discard such data and
> the flow continues.
>
> 2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR]
> Uncaught exception in thread (name=main). Exiting process now.
> org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable
> to send message from TaskName-Partition 0 to system kafka.
>         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133)
>         at org.apache.samza.container.SamzaContainer.run(
> SamzaContainer.scala:661)
>         at org.apache.samza.container.SamzaContainer$.safeMain(
> SamzaContainer.scala:115)
>         at org.apache.samza.container.SamzaContainer$.main(
> SamzaContainer.scala:89)
>         at org.apache.samza.container.SamzaContainer.main(
> SamzaContainer.scala)
> Caused by: org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 0 to system kafka.
>         at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.
> onCompletion(KafkaSystemProducer.scala:177)
>         at org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:350)
>         at org.apache.samza.system.kafka.KafkaSystemProducer.send(
> KafkaSystemProducer.scala:162)
>         at org.apache.samza.system.SystemProducers.send(
> SystemProducers.scala:87)
>         at org.apache.samza.task.TaskInstanceCollector.send(
> TaskInstanceCollector.scala:60)
>         at com.antfact.datacenter.canal.task.tags.DocumentTagTask.
> process(DocumentTagTask.java:127)
>         at org.apache.samza.task.AsyncStreamTaskAdapter.process(
> AsyncStreamTaskAdapter.java:72)
>         at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(
> AsyncStreamTaskAdapter.java:63)
>         at org.apache.samza.container.TaskInstance$$anonfun$process$
> 1.apply$mcV$sp(TaskInstance.scala:157)
>         at org.apache.samza.container.TaskInstanceExceptionHandler.
> maybeHandle(TaskInstanceExceptionHandler.scala:54)
>         at org.apache.samza.container.TaskInstance.process(
> TaskInstance.scala:155)
>         at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
> process(AsyncRunLoop.java:356)
>         at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
> run(AsyncRunLoop.java:325)
>         at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
> access$300(AsyncRunLoop.java:283)
>         at org.apache.samza.task.AsyncRunLoop.runTasks(
> AsyncRunLoop.java:199)
>         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144)
>         ... 4 more
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 881729 bytes when serialized which is larger than the maximum
> request size you have configured with the max.request.size configuration.
> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down.
> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down
> consumer multiplexer.
> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down
> BrokerProxy for 172.19.105.20:9096
> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple
> consumer...
> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> 172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy
> [INFO] Got interrupt exception in broker proxy thread.
> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down
> BrokerProxy for 172.19.105.22:9096
> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple
> consumer...
> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> 172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy
> [INFO] Got interrupt exception in broker proxy thread.
> 2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task
> instance stream tasks.
> 2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task
> instance stores.
> 2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host
> statistics monitor.
> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
> producer multiplexer.
> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
> locality manager.
> 2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO]
> Stopping coordinator stream producer.
> 2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset
> manager.
> 2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics
> reporters.
> 2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping
> producer.
> 2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping
> reporter timer.
> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM
> metrics.
> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete.
>
> Thanks!
>
> ————————
> QiShu
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University