You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paul Joireman <pa...@physiq.com> on 2016/08/08 14:33:37 UTC

Using RabbitMQ Sinks

Hi all,


The documentation describing the use of RabbitMQ as a sink gives the following example:


RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));

However, a search of the flink github mirrored repo<https://github.com/apache/flink> does not show where StringToByteSerializer is defined and only shows it being used in the documentation of

this example.    I've tried using a SimpleStringSchema which seems to handle serialization but this raises an exception when I attempt to

run it.


Does anyone have any experience with using a RabbitMQ sink?   Any pointers as to what I'm doing wrong.


Thanks,

Paul

Re: Using RabbitMQ Sinks

Posted by Robert Metzger <rm...@apache.org>.
The error you've reported is coming from the RabbitMQ library. I guess the
problem is that the queue already exists, and it has been created with
different parameters [1].
Either you overwrite the "protected void setupQueue()" method in Flink to
match the params with the existing queue, or you use a different queue name.

I hope that helps.

[1]
http://rabbitmq.1065348.n5.nabble.com/java-io-IOException-because-of-Parameters-td1581.html

On Mon, Aug 8, 2016 at 5:27 PM, Paul Joireman <pa...@physiq.com>
wrote:

> Robert,
>
>
> It looks like the root cause exception is:
>
>
> com.rabbitmq.client.ShutdownSignalException: channel error; protocol
> method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> The full execution trace is shown below and occurs when I try to execute
> my process.   In the process, I initially create a connection to RMQ using
> the RQQConnectionConfig.Builder() and this works fine if I read using a
> SimpleStringSchema but re-using the same connection configuration for the
> sink as follows:
>
> msgs.addSink(new RMQSink<String>(rmqConnConfig, ALERT_SINK_QUEUE, new
> SimpleStringSchema()));
>
> Where msgs are Strings.
>
> Paul
>
> Connected to the target VM, address: '127.0.0.1:42208', transport:
> 'socket'
> 10:13:50,555 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>         - class com.physiq.alert.AlertMessageIn is not a valid POJO type
> 10:13:57,035 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment
>  - Running job on local embedded Flink mini cluster
> 10:13:58,100 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
>         - Starting FlinkMiniCluster.
> 10:13:58,580 INFO  akka.event.slf4j.Slf4jLogger
>        - Slf4jLogger started
> 10:13:58,614 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Created BLOB server storage directory /tmp/blobStore-b19e22df-4636-
> 4e96-bb45-db70c0bcc7e1
> 10:13:58,620 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Started BLOB server at 0.0.0.0:34360 - max concurrent requests:
> 50 - max backlog: 1000
> 10:13:58,635 INFO  org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory
>  - Using job manager savepoint state backend.
> 10:13:58,642 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>         - No metrics reporter configured, no metrics will be
> exposed/reported.
> 10:13:58,655 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
>         - Started memory archivist akka://flink/user/archive_1
> 10:13:58,657 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Starting JobManager at akka://flink/user/jobmanager_1.
> 10:13:58,678 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 10:13:58,683 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Messages between TaskManager and JobManager have a max timeout of
> 10000 milliseconds
> 10:13:58,703 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Temporary file directory '/tmp': total 102 GB, usable 9 GB (8.82%
> usable)
> 10:13:58,844 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
>  - Allocated 64 MB for network buffer pool (number of memory segments:
> 2048, bytes per segment: 32768).
> 10:13:58,846 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Limiting managed memory to 549 MB, memory will be allocated lazily.
> 10:14:00,163 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
>        - I/O manager uses directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
> for spill files.
> 10:14:00,167 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - JobManager akka://flink/user/jobmanager_1 was granted leadership
> with leader session ID None.
> 10:14:00,239 INFO  org.apache.flink.runtime.filecache.FileCache
>        - User file cache uses directory /tmp/flink-dist-cache-
> 2e8256c3-532c-45b6-ad2d-fa7b43846a6e
> 10:14:00,239 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - Resource Manager associating with leading JobManager
> Actor[akka://flink/user/jobmanager_1#-1689614413] - leader session null
> 10:14:00,495 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Starting TaskManager actor at akka://flink/user/taskmanager_
> 1#2008243751.
> 10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - TaskManager data connection information: localhost (dataPort=58335)
> 10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - TaskManager has 4 task slot(s).
> 10:14:00,498 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Memory usage stats: [HEAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB
> (used/committed/max)]
> 10:14:00,506 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Trying to register at JobManager akka://flink/user/jobmanager_1
> (attempt 1, timeout: 500 milliseconds)
> 10:14:00,508 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - TaskManager ResourceID{resourceId='8ca61ba2f8741d54ce649bd0d525d50c'}
> has started.
> 10:14:00,511 INFO  org.apache.flink.runtime.instance.InstanceManager
>         - Registered TaskManager at localhost (akka://flink/user/taskmanager_1)
> as db51cf9997f8059543464810ffaffea3. Current number of registered hosts
> is 1. Current number of alive task slots is 4.
> 10:14:00,517 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Successful registration at JobManager
> (akka://flink/user/jobmanager_1), starting network stack and library
> cache.
> 10:14:00,534 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Determined BLOB server address to be localhost/127.0.0.1:34360.
> Starting BLOB cache.
> 10:14:00,535 INFO  org.apache.flink.runtime.blob.BlobCache
>         - Created BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-
> 40de-b24c-0e2c57c030d0
> 10:14:00,535 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>         - No metrics reporter configured, no metrics will be
> exposed/reported.
> 10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Received job Flink Streaming Job (6d641f666b26088a843355ff3b0b17
> 05).
> 10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Could not submit job Flink Streaming Job (
> 6d641f666b26088a843355ff3b0b1705), because there is no connection to a
> JobManager.
> 10:14:00,551 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Disconnect from JobManager null.
> 10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Connect to JobManager Actor[akka://flink/user/
> jobmanager_1#-1689614413].
> 10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Connected to new JobManager akka://flink/user/jobmanager_1.
> 10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Sending message to JobManager akka://flink/user/jobmanager_1 to
> submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705) and
> wait for progress
> 10:14:00,558 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Upload jar files to job manager akka://flink/user/jobmanager_1.
> 10:14:00,560 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Submit job to the job manager akka://flink/user/jobmanager_1.
> 10:14:00,563 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Submitting job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job).
> 10:14:00,568 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Using restart strategy NoRestartStrategy for
> 6d641f666b26088a843355ff3b0b1705.
> 10:14:00,628 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Scheduling job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job).
> 10:14:00,628 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Job was successfully submitted to the JobManager
> akka://flink/user/jobmanager_1.
> 10:14:00,631 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from CREATED to SCHEDULED
> 10:14:00,631 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Job execution switched to status RUNNING.
> 08/08/2016 10:14:00 Job execution switched to status RUNNING.
> 10:14:00,632 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to
> SCHEDULED
> 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
> 10:14:00,633 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job) changed to RUNNING.
> 10:14:00,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from SCHEDULED to DEPLOYING
> 10:14:00,638 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to
> DEPLOYING
> 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
> 10:14:00,638 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Source: Custom Source (1/1) (attempt #0) to localhost
> 10:14:00,642 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> CREATED to SCHEDULED
> 10:14:00,642 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
> 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
> 10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> SCHEDULED to DEPLOYING
> 10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Map (1/4) (attempt #0) to localhost
> 10:14:00,643 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
> 10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> CREATED to SCHEDULED
> 10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> SCHEDULED to DEPLOYING
> 10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
> 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
> 10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
> 10:14:00,649 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Map (2/4) (attempt #0) to localhost
> 10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> CREATED to SCHEDULED
> 10:14:00,650 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
> 10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
> 10:14:00,655 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Map (3/4) (attempt #0) to localhost
> 10:14:00,656 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> CREATED to SCHEDULED
> 10:14:00,657 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> SCHEDULED to DEPLOYING
> 10:14:00,657 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
> 10:14:00,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Map (4/4) (attempt #0) to localhost
> 10:14:00,659 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
> 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
> 10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
> 10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from CREATED to SCHEDULED
> 10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched
> to SCHEDULED
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to
> SCHEDULED
> 10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from SCHEDULED to DEPLOYING
> 10:14:00,661 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to
> DEPLOYING
> 10:14:00,661 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Filter -> Map -> Sink: Unnamed (1/4) (attempt #0) to
> localhost
> 10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from CREATED to SCHEDULED
> 10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched
> to SCHEDULED
> 10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to
> SCHEDULED
> 10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to
> DEPLOYING
> 10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Filter -> Map -> Sink: Unnamed (2/4) (attempt #0) to
> localhost
> 10:14:00,663 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from CREATED to SCHEDULED
> 10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched
> to SCHEDULED
> 10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to
> SCHEDULED
> 10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Filter -> Map -> Sink: Unnamed (3/4) (attempt #0) to
> localhost
> 10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to
> DEPLOYING
> 10:14:00,665 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from CREATED to SCHEDULED
> 10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched
> to SCHEDULED
> 10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to
> SCHEDULED
> 10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Filter -> Map -> Sink: Unnamed (4/4) (attempt #0) to
> localhost
> 10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to
> DEPLOYING
> 10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Source: Custom Source (1/1)
> 10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Source: Custom Source (1/1)
> 10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Map (1/4)
> 10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Map (1/4)
> 10:14:00,683 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Map (2/4)
> 10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Map (2/4)
> 10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Map (3/4)
> 10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Source: Custom Source (1/1)
> [DEPLOYING]
> 10:14:00,692 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Map (2/4) [DEPLOYING]
> 10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Map (3/4)
> 10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Map (4/4)
> 10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Map (1/4) [DEPLOYING]
> 10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Filter -> Map -> Sink: Unnamed (1/4)
> 10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Map (3/4) [DEPLOYING]
> 10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Filter -> Map -> Sink: Unnamed (2/4)
> 10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Map (4/4)
> 10:14:00,696 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Filter -> Map -> Sink: Unnamed (1/4)
> 10:14:00,703 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (2/4) switched to RUNNING
> 10:14:00,704 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Filter -> Map -> Sink: Unnamed (1/4)
> [DEPLOYING]
> 10:14:00,701 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Source: Custom Source (1/1) switched to RUNNING
> 10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Filter -> Map -> Sink: Unnamed (2/4)
> 10:14:00,709 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Filter -> Map -> Sink: Unnamed (2/4)
> [DEPLOYING]
> 10:14:00,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Filter -> Map -> Sink: Unnamed (3/4)
> 10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Filter -> Map -> Sink: Unnamed (3/4)
> 10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Map (4/4) [DEPLOYING]
> 10:14:00,711 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Filter -> Map -> Sink: Unnamed (4/4)
> 10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (3/4) switched to RUNNING
> 10:14:00,715 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (1/4) switched to RUNNING
> 10:14:00,719 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Loading JAR files for task Filter -> Map -> Sink: Unnamed (4/4)
> 10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (1/4) switched to RUNNING
> 10:14:00,723 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (2/4) switched to RUNNING
> 10:14:00,720 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (4/4) switched to RUNNING
> 10:14:00,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> DEPLOYING to RUNNING
> 10:14:00,725 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from DEPLOYING to RUNNING
> 10:14:00,725 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
> 10:14:00,726 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to
> RUNNING
> 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
> 10:14:00,730 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> DEPLOYING to RUNNING
> 10:14:00,730 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
> 10:14:00,732 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> DEPLOYING to RUNNING
> 10:14:00,732 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
> 10:14:00,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from DEPLOYING to RUNNING
> 10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Filter -> Map -> Sink: Unnamed (3/4)
> [DEPLOYING]
> 10:14:00,736 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Registering task at network: Filter -> Map -> Sink: Unnamed (4/4)
> [DEPLOYING]
> 10:14:00,736 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched
> to RUNNING
> 10:14:00,750 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (3/4) switched to RUNNING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to
> RUNNING
> 10:14:00,750 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from DEPLOYING to RUNNING
> 10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> DEPLOYING to RUNNING
> 10:14:00,751 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched
> to RUNNING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to
> RUNNING
> 10:14:00,752 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
> 10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from DEPLOYING to RUNNING
> 10:14:00,757 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (4/4) switched to RUNNING
> 10:14:00,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from DEPLOYING to RUNNING
> 10:14:00,756 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,766 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,765 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched
> to RUNNING
> 10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to
> RUNNING
> 10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,776 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched
> to RUNNING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to
> RUNNING
> 10:14:00,782 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,782 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,787 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,787 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,789 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,790 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,791 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,792 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,794 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,794 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,804 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,804 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,806 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,806 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,808 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,808 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,809 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
>  - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
>  - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
>  - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task
>         - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
>  - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,041 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (3/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task
>         - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,043 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (2/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task
>         - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task
>         - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,045 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (1/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,046 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Filter -> Map -> Sink: Unnamed (4/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,057 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Filter -> Map -> Sink: Unnamed (4/4)
> 10:14:01,058 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Filter -> Map -> Sink: Unnamed (1/4)
> 10:14:01,059 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Filter -> Map -> Sink: Unnamed (2/4)
> 10:14:01,063 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Filter -> Map -> Sink: Unnamed (3/4)
> 10:14:01,074 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> 7c219dda43bc53571d306fc0df4468fa)
> 10:14:01,078 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> dafdfde1351e3eb9593fa227e8255b57)
> 10:14:01,080 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> 54a627d8091077e6742cd74d754016a7)
> 10:14:01,082 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> b3a3e69c7dd082fc744b6ec791697e35)
> 10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from RUNNING to FAILED
> 10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from RUNNING to FAILED
> 10:14:01,086 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from RUNNING to FAILED
> 10:14:01,086 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,087 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,088 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from RUNNING to FAILED
> 10:14:01,090 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,092 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job) changed to FAILING.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,092 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Job execution switched to status FAILING.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,092 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from RUNNING to CANCELING
> 08/08/2016 10:14:01 Job execution switched to status FAILING.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,093 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to
> CANCELING
> 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
> 10:14:01,095 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Attempting to cancel task Source: Custom Source (1/1)
> 10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Source: Custom Source (1/1) switched to CANCELING
> 10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Triggering cancellation of task code Source: Custom Source (1/1) (
> 9ffba0e9f84adca163121d50f88e519e).
> 10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> RUNNING to CANCELING
> 10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> RUNNING to CANCELING
> 10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> RUNNING to CANCELING
> 10:14:01,098 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> RUNNING to CANCELING
> 10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
> 10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
> 10:14:01,099 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
> 10:14:01,100 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
> 10:14:01,107 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Attempting to cancel task Map (1/4)
> 10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (1/4) switched to CANCELING
> 10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Triggering cancellation of task code Map (1/4) (
> 4a13efc94d64d61532106fbd9bdfaedb).
> 10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Attempting to cancel task Map (2/4)
> 10:14:01,110 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (2/4) switched to CANCELING
> 10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Triggering cancellation of task code Map (2/4) (
> 99136f14b2e32b44318b49b2ad39dde5).
> 10:14:01,111 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,111 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Attempting to cancel task Map (3/4)
> 10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (3/4) switched to CANCELING
> 10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Triggering cancellation of task code Map (3/4) (
> 029bc351981b5056208839ce988f0f3f).
> 10:14:01,112 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Attempting to cancel task Map (4/4)
> 10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (4/4) switched to CANCELING
> 10:14:01,113 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Triggering cancellation of task code Map (4/4) (
> ae5e9324d04ffac7843317749a2e86dd).
> 10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (1/4) switched to CANCELED
> 10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Map (1/4)
> 10:14:01,116 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Discarding the results produced by task execution
> b3a3e69c7dd082fc744b6ec791697e35
> 10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (2/4) switched to CANCELED
> 10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Discarding the results produced by task execution
> 54a627d8091077e6742cd74d754016a7
> 10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (3/4) switched to CANCELED
> 10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Map (2/4)
> 10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Map (3/4)
> 10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Discarding the results produced by task execution
> dafdfde1351e3eb9593fa227e8255b57
> 10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Discarding the results produced by task execution
> 7c219dda43bc53571d306fc0df4468fa
> 10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (4a13efc94d64d61532106fbd9bdfaedb)
> 10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (99136f14b2e32b44318b49b2ad39dde5)
> 10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (029bc351981b5056208839ce988f0f3f)
> 10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> CANCELING to CANCELED
> 10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> CANCELING to CANCELED
> 10:14:01,121 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>         - Timer service is shutting down.
> 10:14:01,122 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> CANCELING to CANCELED
> 10:14:01,122 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
> 10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
> 10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
> 10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Map (4/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
> 10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Map (4/4)
> 10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (ae5e9324d04ffac7843317749a2e86dd)
> 10:14:01,131 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> CANCELING to CANCELED
> 10:14:01,132 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
> 10:14:01,147 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Source: Custom Source (1/1) switched to CANCELED
> 10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.Task
>         - Freeing task resources for Source: Custom Source (1/1)
> 10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state CANCELED to
> JobManager for task Source: Custom Source (9ffba0e9f84adca163121d50f88e51
> 9e)
> 10:14:01,149 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from CANCELING to CANCELED
> 10:14:01,150 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to
> CANCELED
> 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
> 10:14:01,154 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job) changed to FAILED.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,154 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 08/08/2016 10:14:01 Job execution switched to status FAILED.
> 08/08/2016 10:14:01 Job execution switched to status FAILED.
> 10:14:01,160 INFO  org.apache.flink.runtime.client.JobClient
>         - Job execution failed
> 10:14:01,160 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Terminate JobClientActor.
> 10:14:01,161 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
>         - Stopping FlinkMiniCluster.
> 10:14:01,161 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Disconnect from JobManager Actor[akka://flink/user/
> jobmanager_1#-1689614413].
> 10:14:01,174 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Stopping JobManager akka://flink/user/jobmanager_1.
> 10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Stopping TaskManager akka://flink/user/taskmanager_1#2008243751.
> 10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Disassociating from JobManager
> 10:14:01,192 INFO  org.apache.flink.runtime.blob.BlobCache
>         - Shutting down BlobCache
> 10:14:01,194 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Stopped BLOB server at 0.0.0.0:34360
> 10:14:01,208 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
>        - I/O manager removed spill file directory
> /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
> 10:14:01,212 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Task manager akka://flink/user/taskmanager_1 is completely shut
> down.
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException:
> Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> mcV$sp(JobManager.scala:822)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
>
> ------------------------------
> *From:* Robert Metzger <rm...@apache.org>
> *Sent:* Monday, August 8, 2016 9:48:39 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Using RabbitMQ Sinks
>
> Hi Paul,
>
> the example in the code is outdated, StringToByteSerializer has probably
> been removed quite a while ago. I'll update the documentation once we
> figured out the other problem you reported.
> What's the exception you are getting?
>
> Regards,
> Robert
>
> On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <pa...@physiq.com>
> wrote:
>
>> Hi all,
>>
>>
>> The documentation describing the use of RabbitMQ as a sink gives the
>> following example:
>>
>>
>> RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5000).setUserName(..).setPassword(..).setVirtualHost("/").build();stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
>>
>> However, a search of the flink github mirrored repo
>> <https://github.com/apache/flink> does not show where
>> StringToByteSerializer is defined and only shows it being used in the
>> documentation of
>>
>> this example.    I've tried using a SimpleStringSchema which seems to
>> handle serialization but this raises an exception when I attempt to
>>
>> run it.
>>
>>
>> Does anyone have any experience with using a RabbitMQ sink?   Any
>> pointers as to what I'm doing wrong.
>>
>>
>> Thanks,
>>
>> Paul
>>
>>
>

Re: Using RabbitMQ Sinks

Posted by Paul Joireman <pa...@physiq.com>.
Robert,


It looks like the root cause exception is:


com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
The full execution trace is shown below and occurs when I try to execute my process.   In the process, I initially create a connection to RMQ using the RQQConnectionConfig.Builder() and this works fine if I read using a SimpleStringSchema but re-using the same connection configuration for the sink as follows:

msgs.addSink(new RMQSink<String>(rmqConnConfig, ALERT_SINK_QUEUE, new SimpleStringSchema()));

Where msgs are Strings.

Paul

Connected to the target VM, address: '127.0.0.1:42208', transport: 'socket'
10:13:50,555 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.physiq.alert.AlertMessageIn is not a valid POJO type
10:13:57,035 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
10:13:58,100 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
10:13:58,580 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
10:13:58,614 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-b19e22df-4636-4e96-bb45-db70c0bcc7e1
10:13:58,620 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:34360 - max concurrent requests: 50 - max backlog: 1000
10:13:58,635 INFO  org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory  - Using job manager savepoint state backend.
10:13:58,642 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
10:13:58,655 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist akka://flink/user/archive_1
10:13:58,657 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at akka://flink/user/jobmanager_1.
10:13:58,678 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
10:13:58,683 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 10000 milliseconds
10:13:58,703 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Temporary file directory '/tmp': total 102 GB, usable 9 GB (8.82% usable)
10:13:58,844 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
10:13:58,846 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 549 MB, memory will be allocated lazily.
10:14:00,163 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b for spill files.
10:14:00,167 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID None.
10:14:00,239 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /tmp/flink-dist-cache-2e8256c3-532c-45b6-ad2d-fa7b43846a6e
10:14:00,239 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1689614413] - leader session null
10:14:00,495 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at akka://flink/user/taskmanager_1#2008243751.
10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: localhost (dataPort=58335)
10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 4 task slot(s).
10:14:00,498 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB (used/committed/max)]
10:14:00,506 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
10:14:00,508 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - TaskManager ResourceID{resourceId='8ca61ba2f8741d54ce649bd0d525d50c'} has started.
10:14:00,511 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as db51cf9997f8059543464810ffaffea3. Current number of registered hosts is 1. Current number of alive task slots is 4.
10:14:00,517 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
10:14:00,534 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:34360. Starting BLOB cache.
10:14:00,535 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-40de-b24c-0e2c57c030d0
10:14:00,535 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor                - Received job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705).
10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor                - Could not submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705), because there is no connection to a JobManager.
10:14:00,551 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager null.
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor                - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor                - Connected to new JobManager akka://flink/user/jobmanager_1.
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor                - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705) and wait for progress
10:14:00,558 INFO  org.apache.flink.runtime.client.JobClientActor                - Upload jar files to job manager akka://flink/user/jobmanager_1.
10:14:00,560 INFO  org.apache.flink.runtime.client.JobClientActor                - Submit job to the job manager akka://flink/user/jobmanager_1.
10:14:00,563 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,568 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy NoRestartStrategy for 6d641f666b26088a843355ff3b0b1705.
10:14:00,628 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,628 INFO  org.apache.flink.runtime.client.JobClientActor                - Job was successfully submitted to the JobManager akka://flink/user/jobmanager_1.
10:14:00,631 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from CREATED to SCHEDULED
10:14:00,631 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Job execution switched to status RUNNING.
08/08/2016 10:14:00 Job execution switched to status RUNNING.
10:14:00,632 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
10:14:00,633 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to RUNNING.
10:14:00,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from SCHEDULED to DEPLOYING
10:14:00,638 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
10:14:00,638 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #0) to localhost
10:14:00,642 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CREATED to SCHEDULED
10:14:00,642 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from SCHEDULED to DEPLOYING
10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (1/4) (attempt #0) to localhost
10:14:00,643 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CREATED to SCHEDULED
10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from SCHEDULED to DEPLOYING
10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
10:14:00,649 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (2/4) (attempt #0) to localhost
10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CREATED to SCHEDULED
10:14:00,650 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
10:14:00,655 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (3/4) (attempt #0) to localhost
10:14:00,656 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CREATED to SCHEDULED
10:14:00,657 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from SCHEDULED to DEPLOYING
10:14:00,657 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
10:14:00,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (4/4) (attempt #0) to localhost
10:14:00,659 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from CREATED to SCHEDULED
10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED
10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from SCHEDULED to DEPLOYING
10:14:00,661 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING
10:14:00,661 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (1/4) (attempt #0) to localhost
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from CREATED to SCHEDULED
10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED
10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (2/4) (attempt #0) to localhost
10:14:00,663 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from CREATED to SCHEDULED
10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED
10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED
10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (3/4) (attempt #0) to localhost
10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING
10:14:00,665 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from CREATED to SCHEDULED
10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED
10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED
10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (4/4) (attempt #0) to localhost
10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING
10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Source: Custom Source (1/1)
10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source (1/1)
10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (1/4)
10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (1/4)
10:14:00,683 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (2/4)
10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (2/4)
10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (3/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source (1/1) [DEPLOYING]
10:14:00,692 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (2/4) [DEPLOYING]
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (3/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (4/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (1/4) [DEPLOYING]
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (3/4) [DEPLOYING]
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (4/4)
10:14:00,696 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,703 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (2/4) switched to RUNNING
10:14:00,704 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (1/4) [DEPLOYING]
10:14:00,701 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) switched to RUNNING
10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,709 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (2/4) [DEPLOYING]
10:14:00,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (4/4) [DEPLOYING]
10:14:00,711 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (3/4) switched to RUNNING
10:14:00,715 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/4) switched to RUNNING
10:14:00,719 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (1/4) switched to RUNNING
10:14:00,723 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (2/4) switched to RUNNING
10:14:00,720 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (4/4) switched to RUNNING
10:14:00,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from DEPLOYING to RUNNING
10:14:00,725 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from DEPLOYING to RUNNING
10:14:00,725 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
08/08/2016 10:14:00 Map(2/4) switched to RUNNING
10:14:00,726 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
10:14:00,730 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from DEPLOYING to RUNNING
10:14:00,730 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
08/08/2016 10:14:00 Map(3/4) switched to RUNNING
10:14:00,732 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from DEPLOYING to RUNNING
10:14:00,732 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
08/08/2016 10:14:00 Map(1/4) switched to RUNNING
10:14:00,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from DEPLOYING to RUNNING
10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (3/4) [DEPLOYING]
10:14:00,736 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (4/4) [DEPLOYING]
10:14:00,736 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING
10:14:00,750 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (3/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING
10:14:00,750 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from DEPLOYING to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from DEPLOYING to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING
10:14:00,752 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
08/08/2016 10:14:00 Map(4/4) switched to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from DEPLOYING to RUNNING
10:14:00,757 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (4/4) switched to RUNNING
10:14:00,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from DEPLOYING to RUNNING
10:14:00,756 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,766 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,765 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING
10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING
10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,776 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING
10:14:00,782 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,782 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,787 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,787 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,789 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,790 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,791 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,792 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,794 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,794 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,804 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,804 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,806 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,806 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,808 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,808 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,809 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,041 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (3/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,043 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (2/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,045 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (1/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,046 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (4/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,057 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (4/4)
10:14:01,058 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (1/4)
10:14:01,059 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (2/4)
10:14:01,063 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (3/4)
10:14:01,074 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (7c219dda43bc53571d306fc0df4468fa)
10:14:01,078 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (dafdfde1351e3eb9593fa227e8255b57)
10:14:01,080 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (54a627d8091077e6742cd74d754016a7)
10:14:01,082 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (b3a3e69c7dd082fc744b6ec791697e35)
10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from RUNNING to FAILED
10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from RUNNING to FAILED
10:14:01,086 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from RUNNING to FAILED
10:14:01,086 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,087 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,088 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from RUNNING to FAILED
10:14:01,090 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,092 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from RUNNING to CANCELING
08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,093 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
10:14:01,095 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: Custom Source (1/1)
10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) switched to CANCELING
10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e).
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from RUNNING to CANCELING
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from RUNNING to CANCELING
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from RUNNING to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from RUNNING to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
08/08/2016 10:14:01 Map(1/4) switched to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
08/08/2016 10:14:01 Map(2/4) switched to CANCELING
10:14:01,099 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
08/08/2016 10:14:01 Map(3/4) switched to CANCELING
10:14:01,100 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
08/08/2016 10:14:01 Map(4/4) switched to CANCELING
10:14:01,107 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (1/4)
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/4) switched to CANCELING
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb).
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (2/4)
10:14:01,110 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (2/4) switched to CANCELING
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (2/4) (99136f14b2e32b44318b49b2ad39dde5).
10:14:01,111 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,111 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (3/4)
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (3/4) switched to CANCELING
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (3/4) (029bc351981b5056208839ce988f0f3f).
10:14:01,112 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (4/4)
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (4/4) switched to CANCELING
10:14:01,113 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (4/4) (ae5e9324d04ffac7843317749a2e86dd).
10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/4) switched to CANCELED
10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (1/4)
10:14:01,116 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution b3a3e69c7dd082fc744b6ec791697e35
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (2/4) switched to CANCELED
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 54a627d8091077e6742cd74d754016a7
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (3/4) switched to CANCELED
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (2/4)
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (3/4)
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution dafdfde1351e3eb9593fa227e8255b57
10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 7c219dda43bc53571d306fc0df4468fa
10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (4a13efc94d64d61532106fbd9bdfaedb)
10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (99136f14b2e32b44318b49b2ad39dde5)
10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (029bc351981b5056208839ce988f0f3f)
10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CANCELING to CANCELED
10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CANCELING to CANCELED
10:14:01,121 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,122 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CANCELING to CANCELED
10:14:01,122 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
08/08/2016 10:14:01 Map(1/4) switched to CANCELED
10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
08/08/2016 10:14:01 Map(2/4) switched to CANCELED
10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (4/4) switched to CANCELED
08/08/2016 10:14:01 Map(3/4) switched to CANCELED
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (4/4)
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (ae5e9324d04ffac7843317749a2e86dd)
10:14:01,131 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CANCELING to CANCELED
10:14:01,132 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
08/08/2016 10:14:01 Map(4/4) switched to CANCELED
10:14:01,147 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) switched to CANCELED
10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1)
10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (9ffba0e9f84adca163121d50f88e519e)
10:14:01,149 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from CANCELING to CANCELED
10:14:01,150 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
10:14:01,154 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,154 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Job execution switched to status FAILED.
08/08/2016 10:14:01 Job execution switched to status FAILED.
10:14:01,160 INFO  org.apache.flink.runtime.client.JobClient                     - Job execution failed
10:14:01,160 INFO  org.apache.flink.runtime.client.JobClientActor                - Terminate JobClientActor.
10:14:01,161 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Stopping FlinkMiniCluster.
10:14:01,161 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:01,174 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Stopping JobManager akka://flink/user/jobmanager_1.
10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Stopping TaskManager akka://flink/user/taskmanager_1#2008243751.
10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Disassociating from JobManager
10:14:01,192 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
10:14:01,194 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:34360
10:14:01,208 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
10:14:01,212 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Task manager akka://flink/user/taskmanager_1 is completely shut down.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more



________________________________
From: Robert Metzger <rm...@apache.org>
Sent: Monday, August 8, 2016 9:48:39 AM
To: user@flink.apache.org
Subject: Re: Using RabbitMQ Sinks

Hi Paul,

the example in the code is outdated, StringToByteSerializer has probably been removed quite a while ago. I'll update the documentation once we figured out the other problem you reported.
What's the exception you are getting?

Regards,
Robert

On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <pa...@physiq.com>> wrote:

Hi all,


The documentation describing the use of RabbitMQ as a sink gives the following example:


RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));

However, a search of the flink github mirrored repo<https://github.com/apache/flink> does not show where StringToByteSerializer is defined and only shows it being used in the documentation of

this example.    I've tried using a SimpleStringSchema which seems to handle serialization but this raises an exception when I attempt to

run it.


Does anyone have any experience with using a RabbitMQ sink?   Any pointers as to what I'm doing wrong.


Thanks,

Paul


Re: Using RabbitMQ Sinks

Posted by Robert Metzger <rm...@apache.org>.
Hi Paul,

the example in the code is outdated, StringToByteSerializer has probably
been removed quite a while ago. I'll update the documentation once we
figured out the other problem you reported.
What's the exception you are getting?

Regards,
Robert

On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <pa...@physiq.com>
wrote:

> Hi all,
>
>
> The documentation describing the use of RabbitMQ as a sink gives the
> following example:
>
>
> RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5000).setUserName(..).setPassword(..).setVirtualHost("/").build();stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
>
> However, a search of the flink github mirrored repo
> <https://github.com/apache/flink> does not show where
> StringToByteSerializer is defined and only shows it being used in the
> documentation of
>
> this example.    I've tried using a SimpleStringSchema which seems to
> handle serialization but this raises an exception when I attempt to
>
> run it.
>
>
> Does anyone have any experience with using a RabbitMQ sink?   Any pointers
> as to what I'm doing wrong.
>
>
> Thanks,
>
> Paul
>
>