You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Edin (JIRA)" <ji...@apache.org> on 2019/01/26 19:12:00 UTC

[jira] [Updated] (BEAM-6516) Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO

     [ https://issues.apache.org/jira/browse/BEAM-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Edin updated BEAM-6516:
-----------------------
    Description: 
I'm using the RabbitMqIO connector to get messages from RabbitMQ to BigQuery in my current pipeline. When I submit my pipeline to Dataflow, the messages are propagated correctly to BigQuery, however I'm getting the following log error entry in Dataflow which is repeating itself:

{code:java}

java.io.IOException: Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQSource@1cdd077d
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:806)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
        org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:474)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
        com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:206)
        com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:237)
        org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:451)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
        com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)
        com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
        com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
        com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
        com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)
        com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
        com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)
        java.lang.Thread.run(Thread.java:745)
{code}

It seems to me the issue is ACKing messages:
{code:java}
reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80
{code}

I also see a lot of open channels (~90) in the RabbitMQ management interface.

  was:
I'm using the RabbitMqIO connector to get messages from RabbitMQ to BigQuery in my current pipeline. When I submit my pipeline to Dataflow, the messages are propagated correctly to BigQuery, however I'm getting the following log error entry in Dataflow which is repeating itself:

{code:java}

java.io.IOException: Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQSource@1cdd077d
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:806)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
        org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:474)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
        com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:206)
        com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:237)
        org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:451)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
        com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)
        com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
        com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
        com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
        com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)
        com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
        com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)
        java.lang.Thread.run(Thread.java:745)
{code}

It seems to me the issue is ACKing messages:
{code:java}
reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80
{code}


> Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO
> --------------------------------------------------------------------
>
>                 Key: BEAM-6516
>                 URL: https://issues.apache.org/jira/browse/BEAM-6516
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-rabbitmq
>            Reporter: Edin
>            Priority: Major
>
> I'm using the RabbitMqIO connector to get messages from RabbitMQ to BigQuery in my current pipeline. When I submit my pipeline to Dataflow, the messages are propagated correctly to BigQuery, however I'm getting the following log error entry in Dataflow which is repeating itself:
> {code:java}
> java.io.IOException: Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQSource@1cdd077d
>         org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:806)
>         org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
>         org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:474)
>         org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
>         org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
>         com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:206)
>         com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:237)
>         org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:451)
>         org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
>         org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>         org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         java.lang.Thread.run(Thread.java:745)
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
>         com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)
>         com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
>         com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
>         com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
>         com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)
>         com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
>         com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)
>         java.lang.Thread.run(Thread.java:745)
> {code}
> It seems to me the issue is ACKing messages:
> {code:java}
> reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80
> {code}
> I also see a lot of open channels (~90) in the RabbitMQ management interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)