You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Paul Joireman <pa...@physiq.com> on 2016/08/08 14:33:37 UTC
Using RabbitMQ Sinks
Hi all,
The documentation describing the use of RabbitMQ as a sink gives the following example:
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
However, a search of the flink github mirrored repo<https://github.com/apache/flink> does not show where StringToByteSerializer is defined and only shows it being used in the documentation of
this example. I've tried using a SimpleStringSchema which seems to handle serialization but this raises an exception when I attempt to
run it.
Does anyone have any experience with using a RabbitMQ sink? Any pointers as to what I'm doing wrong.
Thanks,
Paul
Re: Using RabbitMQ Sinks
Posted by Robert Metzger <rm...@apache.org>.
The error you've reported is coming from the RabbitMQ library. I guess the
problem is that the queue already exists, and it has been created with
different parameters [1].
Either you overwrite the "protected void setupQueue()" method in Flink to
match the params with the existing queue, or you use a different queue name.
I hope that helps.
[1]
http://rabbitmq.1065348.n5.nabble.com/java-io-IOException-because-of-Parameters-td1581.html
On Mon, Aug 8, 2016 at 5:27 PM, Paul Joireman <pa...@physiq.com>
wrote:
> Robert,
>
>
> It looks like the root cause exception is:
>
>
> com.rabbitmq.client.ShutdownSignalException: channel error; protocol
> method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> The full execution trace is shown below and occurs when I try to execute
> my process. In the process, I initially create a connection to RMQ using
> the RQQConnectionConfig.Builder() and this works fine if I read using a
> SimpleStringSchema but re-using the same connection configuration for the
> sink as follows:
>
> msgs.addSink(new RMQSink<String>(rmqConnConfig, ALERT_SINK_QUEUE, new
> SimpleStringSchema()));
>
> Where msgs are Strings.
>
> Paul
>
> Connected to the target VM, address: '127.0.0.1:42208', transport:
> 'socket'
> 10:13:50,555 INFO org.apache.flink.api.java.typeutils.TypeExtractor
> - class com.physiq.alert.AlertMessageIn is not a valid POJO type
> 10:13:57,035 INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment
> - Running job on local embedded Flink mini cluster
> 10:13:58,100 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster
> - Starting FlinkMiniCluster.
> 10:13:58,580 INFO akka.event.slf4j.Slf4jLogger
> - Slf4jLogger started
> 10:13:58,614 INFO org.apache.flink.runtime.blob.BlobServer
> - Created BLOB server storage directory /tmp/blobStore-b19e22df-4636-
> 4e96-bb45-db70c0bcc7e1
> 10:13:58,620 INFO org.apache.flink.runtime.blob.BlobServer
> - Started BLOB server at 0.0.0.0:34360 - max concurrent requests:
> 50 - max backlog: 1000
> 10:13:58,635 INFO org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory
> - Using job manager savepoint state backend.
> 10:13:58,642 INFO org.apache.flink.runtime.metrics.MetricRegistry
> - No metrics reporter configured, no metrics will be
> exposed/reported.
> 10:13:58,655 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist
> - Started memory archivist akka://flink/user/archive_1
> 10:13:58,657 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Starting JobManager at akka://flink/user/jobmanager_1.
> 10:13:58,678 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
> - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 10:13:58,683 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Messages between TaskManager and JobManager have a max timeout of
> 10000 milliseconds
> 10:13:58,703 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Temporary file directory '/tmp': total 102 GB, usable 9 GB (8.82%
> usable)
> 10:13:58,844 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
> - Allocated 64 MB for network buffer pool (number of memory segments:
> 2048, bytes per segment: 32768).
> 10:13:58,846 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Limiting managed memory to 549 MB, memory will be allocated lazily.
> 10:14:00,163 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager
> - I/O manager uses directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
> for spill files.
> 10:14:00,167 INFO org.apache.flink.runtime.jobmanager.JobManager
> - JobManager akka://flink/user/jobmanager_1 was granted leadership
> with leader session ID None.
> 10:14:00,239 INFO org.apache.flink.runtime.filecache.FileCache
> - User file cache uses directory /tmp/flink-dist-cache-
> 2e8256c3-532c-45b6-ad2d-fa7b43846a6e
> 10:14:00,239 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
> - Resource Manager associating with leading JobManager
> Actor[akka://flink/user/jobmanager_1#-1689614413] - leader session null
> 10:14:00,495 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Starting TaskManager actor at akka://flink/user/taskmanager_
> 1#2008243751.
> 10:14:00,496 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - TaskManager data connection information: localhost (dataPort=58335)
> 10:14:00,496 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - TaskManager has 4 task slot(s).
> 10:14:00,498 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Memory usage stats: [HEAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB
> (used/committed/max)]
> 10:14:00,506 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Trying to register at JobManager akka://flink/user/jobmanager_1
> (attempt 1, timeout: 500 milliseconds)
> 10:14:00,508 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
> - TaskManager ResourceID{resourceId='8ca61ba2f8741d54ce649bd0d525d50c'}
> has started.
> 10:14:00,511 INFO org.apache.flink.runtime.instance.InstanceManager
> - Registered TaskManager at localhost (akka://flink/user/taskmanager_1)
> as db51cf9997f8059543464810ffaffea3. Current number of registered hosts
> is 1. Current number of alive task slots is 4.
> 10:14:00,517 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Successful registration at JobManager
> (akka://flink/user/jobmanager_1), starting network stack and library
> cache.
> 10:14:00,534 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Determined BLOB server address to be localhost/127.0.0.1:34360.
> Starting BLOB cache.
> 10:14:00,535 INFO org.apache.flink.runtime.blob.BlobCache
> - Created BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-
> 40de-b24c-0e2c57c030d0
> 10:14:00,535 INFO org.apache.flink.runtime.metrics.MetricRegistry
> - No metrics reporter configured, no metrics will be
> exposed/reported.
> 10:14:00,549 INFO org.apache.flink.runtime.client.JobClientActor
> - Received job Flink Streaming Job (6d641f666b26088a843355ff3b0b17
> 05).
> 10:14:00,549 INFO org.apache.flink.runtime.client.JobClientActor
> - Could not submit job Flink Streaming Job (
> 6d641f666b26088a843355ff3b0b1705), because there is no connection to a
> JobManager.
> 10:14:00,551 INFO org.apache.flink.runtime.client.JobClientActor
> - Disconnect from JobManager null.
> 10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor
> - Connect to JobManager Actor[akka://flink/user/
> jobmanager_1#-1689614413].
> 10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor
> - Connected to new JobManager akka://flink/user/jobmanager_1.
> 10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor
> - Sending message to JobManager akka://flink/user/jobmanager_1 to
> submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705) and
> wait for progress
> 10:14:00,558 INFO org.apache.flink.runtime.client.JobClientActor
> - Upload jar files to job manager akka://flink/user/jobmanager_1.
> 10:14:00,560 INFO org.apache.flink.runtime.client.JobClientActor
> - Submit job to the job manager akka://flink/user/jobmanager_1.
> 10:14:00,563 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Submitting job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job).
> 10:14:00,568 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Using restart strategy NoRestartStrategy for
> 6d641f666b26088a843355ff3b0b1705.
> 10:14:00,628 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Scheduling job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job).
> 10:14:00,628 INFO org.apache.flink.runtime.client.JobClientActor
> - Job was successfully submitted to the JobManager
> akka://flink/user/jobmanager_1.
> 10:14:00,631 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from CREATED to SCHEDULED
> 10:14:00,631 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Job execution switched to status RUNNING.
> 08/08/2016 10:14:00 Job execution switched to status RUNNING.
> 10:14:00,632 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to
> SCHEDULED
> 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
> 10:14:00,633 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job) changed to RUNNING.
> 10:14:00,637 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from SCHEDULED to DEPLOYING
> 10:14:00,638 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to
> DEPLOYING
> 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
> 10:14:00,638 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Source: Custom Source (1/1) (attempt #0) to localhost
> 10:14:00,642 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> CREATED to SCHEDULED
> 10:14:00,642 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
> 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
> 10:14:00,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> SCHEDULED to DEPLOYING
> 10:14:00,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Map (1/4) (attempt #0) to localhost
> 10:14:00,643 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
> 10:14:00,648 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> CREATED to SCHEDULED
> 10:14:00,648 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> SCHEDULED to DEPLOYING
> 10:14:00,649 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
> 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
> 10:14:00,649 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
> 10:14:00,649 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Map (2/4) (attempt #0) to localhost
> 10:14:00,650 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> CREATED to SCHEDULED
> 10:14:00,650 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
> 10:14:00,650 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
> 10:14:00,655 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Map (3/4) (attempt #0) to localhost
> 10:14:00,656 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> CREATED to SCHEDULED
> 10:14:00,657 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> SCHEDULED to DEPLOYING
> 10:14:00,657 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
> 10:14:00,659 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Map (4/4) (attempt #0) to localhost
> 10:14:00,659 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
> 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
> 10:14:00,660 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
> 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
> 10:14:00,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from CREATED to SCHEDULED
> 10:14:00,660 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched
> to SCHEDULED
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to
> SCHEDULED
> 10:14:00,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from SCHEDULED to DEPLOYING
> 10:14:00,661 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to
> DEPLOYING
> 10:14:00,661 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Filter -> Map -> Sink: Unnamed (1/4) (attempt #0) to
> localhost
> 10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from CREATED to SCHEDULED
> 10:14:00,662 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched
> to SCHEDULED
> 10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to
> SCHEDULED
> 10:14:00,662 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to
> DEPLOYING
> 10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Filter -> Map -> Sink: Unnamed (2/4) (attempt #0) to
> localhost
> 10:14:00,663 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from CREATED to SCHEDULED
> 10:14:00,664 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched
> to SCHEDULED
> 10:14:00,664 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to
> SCHEDULED
> 10:14:00,664 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Filter -> Map -> Sink: Unnamed (3/4) (attempt #0) to
> localhost
> 10:14:00,664 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to
> DEPLOYING
> 10:14:00,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from CREATED to SCHEDULED
> 10:14:00,666 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched
> to SCHEDULED
> 10:14:00,666 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from SCHEDULED to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to
> SCHEDULED
> 10:14:00,666 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying Filter -> Map -> Sink: Unnamed (4/4) (attempt #0) to
> localhost
> 10:14:00,666 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched
> to DEPLOYING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to
> DEPLOYING
> 10:14:00,674 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Source: Custom Source (1/1)
> 10:14:00,674 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Source: Custom Source (1/1)
> 10:14:00,678 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Map (1/4)
> 10:14:00,678 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Map (1/4)
> 10:14:00,683 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Map (2/4)
> 10:14:00,684 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Map (2/4)
> 10:14:00,684 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Map (3/4)
> 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Source: Custom Source (1/1)
> [DEPLOYING]
> 10:14:00,692 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Map (2/4) [DEPLOYING]
> 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Map (3/4)
> 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Map (4/4)
> 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Map (1/4) [DEPLOYING]
> 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Filter -> Map -> Sink: Unnamed (1/4)
> 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Map (3/4) [DEPLOYING]
> 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Filter -> Map -> Sink: Unnamed (2/4)
> 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Map (4/4)
> 10:14:00,696 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Filter -> Map -> Sink: Unnamed (1/4)
> 10:14:00,703 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (2/4) switched to RUNNING
> 10:14:00,704 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Filter -> Map -> Sink: Unnamed (1/4)
> [DEPLOYING]
> 10:14:00,701 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source (1/1) switched to RUNNING
> 10:14:00,698 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Filter -> Map -> Sink: Unnamed (2/4)
> 10:14:00,709 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Filter -> Map -> Sink: Unnamed (2/4)
> [DEPLOYING]
> 10:14:00,699 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Filter -> Map -> Sink: Unnamed (3/4)
> 10:14:00,710 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Filter -> Map -> Sink: Unnamed (3/4)
> 10:14:00,698 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Map (4/4) [DEPLOYING]
> 10:14:00,711 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Filter -> Map -> Sink: Unnamed (4/4)
> 10:14:00,710 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (3/4) switched to RUNNING
> 10:14:00,715 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (1/4) switched to RUNNING
> 10:14:00,719 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Filter -> Map -> Sink: Unnamed (4/4)
> 10:14:00,724 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (1/4) switched to RUNNING
> 10:14:00,723 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (2/4) switched to RUNNING
> 10:14:00,720 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (4/4) switched to RUNNING
> 10:14:00,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> DEPLOYING to RUNNING
> 10:14:00,725 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from DEPLOYING to RUNNING
> 10:14:00,725 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
> 10:14:00,726 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to
> RUNNING
> 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
> 10:14:00,730 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> DEPLOYING to RUNNING
> 10:14:00,730 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
> 10:14:00,732 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> DEPLOYING to RUNNING
> 10:14:00,732 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
> 10:14:00,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from DEPLOYING to RUNNING
> 10:14:00,724 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Filter -> Map -> Sink: Unnamed (3/4)
> [DEPLOYING]
> 10:14:00,736 INFO org.apache.flink.runtime.taskmanager.Task
> - Registering task at network: Filter -> Map -> Sink: Unnamed (4/4)
> [DEPLOYING]
> 10:14:00,736 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched
> to RUNNING
> 10:14:00,750 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (3/4) switched to RUNNING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to
> RUNNING
> 10:14:00,750 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from DEPLOYING to RUNNING
> 10:14:00,751 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> DEPLOYING to RUNNING
> 10:14:00,751 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched
> to RUNNING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to
> RUNNING
> 10:14:00,752 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
> 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
> 10:14:00,751 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from DEPLOYING to RUNNING
> 10:14:00,757 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (4/4) switched to RUNNING
> 10:14:00,770 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from DEPLOYING to RUNNING
> 10:14:00,756 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,766 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,765 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched
> to RUNNING
> 10:14:00,771 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to
> RUNNING
> 10:14:00,771 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,776 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched
> to RUNNING
> 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to
> RUNNING
> 10:14:00,782 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,782 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,787 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,787 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,789 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,790 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,791 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,792 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,792 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,793 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,792 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,793 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,794 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,794 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,796 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,796 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,796 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,796 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,804 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,804 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,806 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,806 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,807 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,807 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,808 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,808 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask
> - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 10:14:00,809 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - State backend is set to heap memory (checkpoint to jobmanager)
> 10:14:01,032 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,032 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,034 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,034 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
> - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
> - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
> - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator
> - Exception while closing user function while failing or canceling task
> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
> to channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(
> AMQChannel.java:265)
> at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(
> ChannelN.java:261)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
> at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> close(RMQSink.java:114)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> closeFunction(FunctionUtils.java:45)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> dispose(AbstractUdfStreamOperator.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> disposeAllOperators(StreamTask.java:426)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> 10:14:01,041 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (3/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,043 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (2/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,045 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (1/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,046 INFO org.apache.flink.runtime.taskmanager.Task
> - Filter -> Map -> Sink: Unnamed (4/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,057 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Filter -> Map -> Sink: Unnamed (4/4)
> 10:14:01,058 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Filter -> Map -> Sink: Unnamed (1/4)
> 10:14:01,059 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Filter -> Map -> Sink: Unnamed (2/4)
> 10:14:01,063 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Filter -> Map -> Sink: Unnamed (3/4)
> 10:14:01,074 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> 7c219dda43bc53571d306fc0df4468fa)
> 10:14:01,078 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> dafdfde1351e3eb9593fa227e8255b57)
> 10:14:01,080 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> 54a627d8091077e6742cd74d754016a7)
> 10:14:01,082 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state FAILED to
> JobManager for task Filter -> Map -> Sink: Unnamed (
> b3a3e69c7dd082fc744b6ec791697e35)
> 10:14:01,085 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (4/4) (
> 7c219dda43bc53571d306fc0df4468fa) switched from RUNNING to FAILED
> 10:14:01,085 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (3/4) (
> dafdfde1351e3eb9593fa227e8255b57) switched from RUNNING to FAILED
> 10:14:01,086 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (2/4) (
> 54a627d8091077e6742cd74d754016a7) switched from RUNNING to FAILED
> 10:14:01,086 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,087 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,088 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,089 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Filter -> Map -> Sink: Unnamed (1/4) (
> b3a3e69c7dd082fc744b6ec791697e35) switched from RUNNING to FAILED
> 10:14:01,090 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched
> to FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to
> FAILED
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
> 10:14:01,092 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job) changed to FAILING.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,092 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Job execution switched to status FAILING.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,092 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from RUNNING to CANCELING
> 08/08/2016 10:14:01 Job execution switched to status FAILING.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,093 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to
> CANCELING
> 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
> 10:14:01,095 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Source: Custom Source (1/1)
> 10:14:01,096 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source (1/1) switched to CANCELING
> 10:14:01,096 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Source: Custom Source (1/1) (
> 9ffba0e9f84adca163121d50f88e519e).
> 10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> RUNNING to CANCELING
> 10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> RUNNING to CANCELING
> 10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> RUNNING to CANCELING
> 10:14:01,098 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> RUNNING to CANCELING
> 10:14:01,098 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
> 10:14:01,098 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
> 10:14:01,099 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
> 10:14:01,100 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
> 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
> 10:14:01,107 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Map (1/4)
> 10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (1/4) switched to CANCELING
> 10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Map (1/4) (
> 4a13efc94d64d61532106fbd9bdfaedb).
> 10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Map (2/4)
> 10:14:01,110 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (2/4) switched to CANCELING
> 10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Map (2/4) (
> 99136f14b2e32b44318b49b2ad39dde5).
> 10:14:01,111 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,111 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Map (3/4)
> 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (3/4) switched to CANCELING
> 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Map (3/4) (
> 029bc351981b5056208839ce988f0f3f).
> 10:14:01,112 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Map (4/4)
> 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (4/4) switched to CANCELING
> 10:14:01,113 INFO org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Map (4/4) (
> ae5e9324d04ffac7843317749a2e86dd).
> 10:14:01,114 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (1/4) switched to CANCELED
> 10:14:01,114 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Map (1/4)
> 10:14:01,116 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Discarding the results produced by task execution
> b3a3e69c7dd082fc744b6ec791697e35
> 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (2/4) switched to CANCELED
> 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Discarding the results produced by task execution
> 54a627d8091077e6742cd74d754016a7
> 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (3/4) switched to CANCELED
> 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Map (2/4)
> 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Map (3/4)
> 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Discarding the results produced by task execution
> dafdfde1351e3eb9593fa227e8255b57
> 10:14:01,119 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Discarding the results produced by task execution
> 7c219dda43bc53571d306fc0df4468fa
> 10:14:01,119 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (4a13efc94d64d61532106fbd9bdfaedb)
> 10:14:01,120 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (99136f14b2e32b44318b49b2ad39dde5)
> 10:14:01,120 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (029bc351981b5056208839ce988f0f3f)
> 10:14:01,121 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from
> CANCELING to CANCELED
> 10:14:01,121 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from
> CANCELING to CANCELED
> 10:14:01,121 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
> - Timer service is shutting down.
> 10:14:01,122 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from
> CANCELING to CANCELED
> 10:14:01,122 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
> 10:14:01,123 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
> 10:14:01,123 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
> 10:14:01,124 INFO org.apache.flink.runtime.taskmanager.Task
> - Map (4/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
> 10:14:01,124 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Map (4/4)
> 10:14:01,124 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state CANCELED to
> JobManager for task Map (ae5e9324d04ffac7843317749a2e86dd)
> 10:14:01,131 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from
> CANCELING to CANCELED
> 10:14:01,132 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
> 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
> 10:14:01,147 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source (1/1) switched to CANCELED
> 10:14:01,148 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Source: Custom Source (1/1)
> 10:14:01,148 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Un-registering task and sending final execution state CANCELED to
> JobManager for task Source: Custom Source (9ffba0e9f84adca163121d50f88e51
> 9e)
> 10:14:01,149 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e)
> switched from CANCELING to CANCELED
> 10:14:01,150 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to
> CANCELED
> 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
> 10:14:01,154 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming
> Job) changed to FAILED.
> java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
> 10:14:01,154 INFO org.apache.flink.runtime.client.JobClientActor
> - 08/08/2016 10:14:01 Job execution switched to status FAILED.
> 08/08/2016 10:14:01 Job execution switched to status FAILED.
> 10:14:01,160 INFO org.apache.flink.runtime.client.JobClient
> - Job execution failed
> 10:14:01,160 INFO org.apache.flink.runtime.client.JobClientActor
> - Terminate JobClientActor.
> 10:14:01,161 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster
> - Stopping FlinkMiniCluster.
> 10:14:01,161 INFO org.apache.flink.runtime.client.JobClientActor
> - Disconnect from JobManager Actor[akka://flink/user/
> jobmanager_1#-1689614413].
> 10:14:01,174 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Stopping JobManager akka://flink/user/jobmanager_1.
> 10:14:01,184 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Stopping TaskManager akka://flink/user/taskmanager_1#2008243751.
> 10:14:01,184 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Disassociating from JobManager
> 10:14:01,192 INFO org.apache.flink.runtime.blob.BlobCache
> - Shutting down BlobCache
> 10:14:01,194 INFO org.apache.flink.runtime.blob.BlobServer
> - Stopped BLOB server at 0.0.0.0:34360
> 10:14:01,208 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager
> - I/O manager removed spill file directory
> /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
> 10:14:01,212 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Task manager akka://flink/user/taskmanager_1 is completely shut
> down.
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException:
> Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> mcV$sp(JobManager.scala:822)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error while creating the channel
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:84)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.
> open(RMQSink.java:82)
> ... 6 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:67)
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:33)
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:343)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
> ... 9 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost
> '/' not equivalent, class-id=50, method-id=10)
> at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
> at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
> at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(
> AMQChannel.java:144)
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(
> AMQConnection.java:550)
> ... 1 more
>
>
> ------------------------------
> *From:* Robert Metzger <rm...@apache.org>
> *Sent:* Monday, August 8, 2016 9:48:39 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Using RabbitMQ Sinks
>
> Hi Paul,
>
> the example in the code is outdated, StringToByteSerializer has probably
> been removed quite a while ago. I'll update the documentation once we
> figured out the other problem you reported.
> What's the exception you are getting?
>
> Regards,
> Robert
>
> On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <pa...@physiq.com>
> wrote:
>
>> Hi all,
>>
>>
>> The documentation describing the use of RabbitMQ as a sink gives the
>> following example:
>>
>>
>> RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5000).setUserName(..).setPassword(..).setVirtualHost("/").build();stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
>>
>> However, a search of the flink github mirrored repo
>> <https://github.com/apache/flink> does not show where
>> StringToByteSerializer is defined and only shows it being used in the
>> documentation of
>>
>> this example. I've tried using a SimpleStringSchema which seems to
>> handle serialization but this raises an exception when I attempt to
>>
>> run it.
>>
>>
>> Does anyone have any experience with using a RabbitMQ sink? Any
>> pointers as to what I'm doing wrong.
>>
>>
>> Thanks,
>>
>> Paul
>>
>>
>
Re: Using RabbitMQ Sinks
Posted by Paul Joireman <pa...@physiq.com>.
Robert,
It looks like the root cause exception is:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
The full execution trace is shown below and occurs when I try to execute my process. In the process, I initially create a connection to RMQ using the RQQConnectionConfig.Builder() and this works fine if I read using a SimpleStringSchema but re-using the same connection configuration for the sink as follows:
msgs.addSink(new RMQSink<String>(rmqConnConfig, ALERT_SINK_QUEUE, new SimpleStringSchema()));
Where msgs are Strings.
Paul
Connected to the target VM, address: '127.0.0.1:42208', transport: 'socket'
10:13:50,555 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.physiq.alert.AlertMessageIn is not a valid POJO type
10:13:57,035 INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - Running job on local embedded Flink mini cluster
10:13:58,100 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster.
10:13:58,580 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
10:13:58,614 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-b19e22df-4636-4e96-bb45-db70c0bcc7e1
10:13:58,620 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:34360 - max concurrent requests: 50 - max backlog: 1000
10:13:58,635 INFO org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory - Using job manager savepoint state backend.
10:13:58,642 INFO org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter configured, no metrics will be exposed/reported.
10:13:58,655 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist - Started memory archivist akka://flink/user/archive_1
10:13:58,657 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka://flink/user/jobmanager_1.
10:13:58,678 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
10:13:58,683 INFO org.apache.flink.runtime.taskmanager.TaskManager - Messages between TaskManager and JobManager have a max timeout of 10000 milliseconds
10:13:58,703 INFO org.apache.flink.runtime.taskmanager.TaskManager - Temporary file directory '/tmp': total 102 GB, usable 9 GB (8.82% usable)
10:13:58,844 INFO org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
10:13:58,846 INFO org.apache.flink.runtime.taskmanager.TaskManager - Limiting managed memory to 549 MB, memory will be allocated lazily.
10:14:00,163 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager uses directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b for spill files.
10:14:00,167 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID None.
10:14:00,239 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-2e8256c3-532c-45b6-ad2d-fa7b43846a6e
10:14:00,239 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1689614413] - leader session null
10:14:00,495 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager_1#2008243751.
10:14:00,496 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: localhost (dataPort=58335)
10:14:00,496 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 4 task slot(s).
10:14:00,498 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB (used/committed/max)]
10:14:00,506 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
10:14:00,508 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - TaskManager ResourceID{resourceId='8ca61ba2f8741d54ce649bd0d525d50c'} has started.
10:14:00,511 INFO org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as db51cf9997f8059543464810ffaffea3. Current number of registered hosts is 1. Current number of alive task slots is 4.
10:14:00,517 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
10:14:00,534 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be localhost/127.0.0.1:34360. Starting BLOB cache.
10:14:00,535 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-40de-b24c-0e2c57c030d0
10:14:00,535 INFO org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter configured, no metrics will be exposed/reported.
10:14:00,549 INFO org.apache.flink.runtime.client.JobClientActor - Received job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705).
10:14:00,549 INFO org.apache.flink.runtime.client.JobClientActor - Could not submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705), because there is no connection to a JobManager.
10:14:00,551 INFO org.apache.flink.runtime.client.JobClientActor - Disconnect from JobManager null.
10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor - Connected to new JobManager akka://flink/user/jobmanager_1.
10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705) and wait for progress
10:14:00,558 INFO org.apache.flink.runtime.client.JobClientActor - Upload jar files to job manager akka://flink/user/jobmanager_1.
10:14:00,560 INFO org.apache.flink.runtime.client.JobClientActor - Submit job to the job manager akka://flink/user/jobmanager_1.
10:14:00,563 INFO org.apache.flink.runtime.jobmanager.JobManager - Submitting job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,568 INFO org.apache.flink.runtime.jobmanager.JobManager - Using restart strategy NoRestartStrategy for 6d641f666b26088a843355ff3b0b1705.
10:14:00,628 INFO org.apache.flink.runtime.jobmanager.JobManager - Scheduling job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,628 INFO org.apache.flink.runtime.client.JobClientActor - Job was successfully submitted to the JobManager akka://flink/user/jobmanager_1.
10:14:00,631 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from CREATED to SCHEDULED
10:14:00,631 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Job execution switched to status RUNNING.
08/08/2016 10:14:00 Job execution switched to status RUNNING.
10:14:00,632 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
10:14:00,633 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to RUNNING.
10:14:00,637 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from SCHEDULED to DEPLOYING
10:14:00,638 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
10:14:00,638 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source (1/1) (attempt #0) to localhost
10:14:00,642 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CREATED to SCHEDULED
10:14:00,642 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
10:14:00,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from SCHEDULED to DEPLOYING
10:14:00,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map (1/4) (attempt #0) to localhost
10:14:00,643 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
10:14:00,648 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CREATED to SCHEDULED
10:14:00,648 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from SCHEDULED to DEPLOYING
10:14:00,649 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
10:14:00,649 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
10:14:00,649 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map (2/4) (attempt #0) to localhost
10:14:00,650 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CREATED to SCHEDULED
10:14:00,650 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
10:14:00,650 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
10:14:00,655 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map (3/4) (attempt #0) to localhost
10:14:00,656 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CREATED to SCHEDULED
10:14:00,657 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from SCHEDULED to DEPLOYING
10:14:00,657 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
10:14:00,659 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map (4/4) (attempt #0) to localhost
10:14:00,659 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
10:14:00,660 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
10:14:00,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from CREATED to SCHEDULED
10:14:00,660 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED
10:14:00,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from SCHEDULED to DEPLOYING
10:14:00,661 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING
10:14:00,661 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Filter -> Map -> Sink: Unnamed (1/4) (attempt #0) to localhost
10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from CREATED to SCHEDULED
10:14:00,662 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED
10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED
10:14:00,662 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING
10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Filter -> Map -> Sink: Unnamed (2/4) (attempt #0) to localhost
10:14:00,663 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from CREATED to SCHEDULED
10:14:00,664 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED
10:14:00,664 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED
10:14:00,664 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Filter -> Map -> Sink: Unnamed (3/4) (attempt #0) to localhost
10:14:00,664 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING
10:14:00,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from CREATED to SCHEDULED
10:14:00,666 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED
10:14:00,666 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED
10:14:00,666 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Filter -> Map -> Sink: Unnamed (4/4) (attempt #0) to localhost
10:14:00,666 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING
10:14:00,674 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Source: Custom Source (1/1)
10:14:00,674 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source (1/1)
10:14:00,678 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Map (1/4)
10:14:00,678 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Map (1/4)
10:14:00,683 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Map (2/4)
10:14:00,684 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Map (2/4)
10:14:00,684 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Map (3/4)
10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Source: Custom Source (1/1) [DEPLOYING]
10:14:00,692 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Map (2/4) [DEPLOYING]
10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Map (3/4)
10:14:00,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Map (4/4)
10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Map (1/4) [DEPLOYING]
10:14:00,695 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,695 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Map (3/4) [DEPLOYING]
10:14:00,695 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,695 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Map (4/4)
10:14:00,696 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,703 INFO org.apache.flink.runtime.taskmanager.Task - Map (2/4) switched to RUNNING
10:14:00,704 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Filter -> Map -> Sink: Unnamed (1/4) [DEPLOYING]
10:14:00,701 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) switched to RUNNING
10:14:00,698 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,709 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Filter -> Map -> Sink: Unnamed (2/4) [DEPLOYING]
10:14:00,699 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,710 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,698 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Map (4/4) [DEPLOYING]
10:14:00,711 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,710 INFO org.apache.flink.runtime.taskmanager.Task - Map (3/4) switched to RUNNING
10:14:00,715 INFO org.apache.flink.runtime.taskmanager.Task - Map (1/4) switched to RUNNING
10:14:00,719 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,724 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (1/4) switched to RUNNING
10:14:00,723 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (2/4) switched to RUNNING
10:14:00,720 INFO org.apache.flink.runtime.taskmanager.Task - Map (4/4) switched to RUNNING
10:14:00,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from DEPLOYING to RUNNING
10:14:00,725 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from DEPLOYING to RUNNING
10:14:00,725 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
08/08/2016 10:14:00 Map(2/4) switched to RUNNING
10:14:00,726 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
10:14:00,730 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from DEPLOYING to RUNNING
10:14:00,730 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
08/08/2016 10:14:00 Map(3/4) switched to RUNNING
10:14:00,732 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from DEPLOYING to RUNNING
10:14:00,732 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
08/08/2016 10:14:00 Map(1/4) switched to RUNNING
10:14:00,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from DEPLOYING to RUNNING
10:14:00,724 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Filter -> Map -> Sink: Unnamed (3/4) [DEPLOYING]
10:14:00,736 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Filter -> Map -> Sink: Unnamed (4/4) [DEPLOYING]
10:14:00,736 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING
10:14:00,750 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (3/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING
10:14:00,750 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from DEPLOYING to RUNNING
10:14:00,751 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from DEPLOYING to RUNNING
10:14:00,751 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING
10:14:00,752 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
08/08/2016 10:14:00 Map(4/4) switched to RUNNING
10:14:00,751 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from DEPLOYING to RUNNING
10:14:00,757 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (4/4) switched to RUNNING
10:14:00,770 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from DEPLOYING to RUNNING
10:14:00,756 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,766 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,765 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING
10:14:00,771 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING
10:14:00,771 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,776 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING
10:14:00,782 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,782 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,787 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,787 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,789 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,790 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,791 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,792 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,792 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,793 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,792 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,793 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,794 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,794 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,796 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,796 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,796 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,796 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,804 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,804 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,806 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,806 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,807 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,807 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,808 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,808 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,809 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager)
10:14:01,032 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,032 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,034 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,034 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,041 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (3/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,043 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (2/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,045 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (1/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,046 INFO org.apache.flink.runtime.taskmanager.Task - Filter -> Map -> Sink: Unnamed (4/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,057 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Filter -> Map -> Sink: Unnamed (4/4)
10:14:01,058 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Filter -> Map -> Sink: Unnamed (1/4)
10:14:01,059 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Filter -> Map -> Sink: Unnamed (2/4)
10:14:01,063 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Filter -> Map -> Sink: Unnamed (3/4)
10:14:01,074 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (7c219dda43bc53571d306fc0df4468fa)
10:14:01,078 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (dafdfde1351e3eb9593fa227e8255b57)
10:14:01,080 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (54a627d8091077e6742cd74d754016a7)
10:14:01,082 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (b3a3e69c7dd082fc744b6ec791697e35)
10:14:01,085 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from RUNNING to FAILED
10:14:01,085 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from RUNNING to FAILED
10:14:01,086 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from RUNNING to FAILED
10:14:01,086 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,087 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,088 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,089 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from RUNNING to FAILED
10:14:01,090 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from RUNNING to CANCELING
08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,093 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
10:14:01,095 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: Custom Source (1/1)
10:14:01,096 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) switched to CANCELING
10:14:01,096 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e).
10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from RUNNING to CANCELING
10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from RUNNING to CANCELING
10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from RUNNING to CANCELING
10:14:01,098 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from RUNNING to CANCELING
10:14:01,098 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
08/08/2016 10:14:01 Map(1/4) switched to CANCELING
10:14:01,098 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
08/08/2016 10:14:01 Map(2/4) switched to CANCELING
10:14:01,099 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
08/08/2016 10:14:01 Map(3/4) switched to CANCELING
10:14:01,100 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
08/08/2016 10:14:01 Map(4/4) switched to CANCELING
10:14:01,107 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Map (1/4)
10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task - Map (1/4) switched to CANCELING
10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb).
10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Map (2/4)
10:14:01,110 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task - Map (2/4) switched to CANCELING
10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Map (2/4) (99136f14b2e32b44318b49b2ad39dde5).
10:14:01,111 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,111 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Map (3/4)
10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task - Map (3/4) switched to CANCELING
10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Map (3/4) (029bc351981b5056208839ce988f0f3f).
10:14:01,112 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Map (4/4)
10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task - Map (4/4) switched to CANCELING
10:14:01,113 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Map (4/4) (ae5e9324d04ffac7843317749a2e86dd).
10:14:01,114 INFO org.apache.flink.runtime.taskmanager.Task - Map (1/4) switched to CANCELED
10:14:01,114 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map (1/4)
10:14:01,116 INFO org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results produced by task execution b3a3e69c7dd082fc744b6ec791697e35
10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task - Map (2/4) switched to CANCELED
10:14:01,118 INFO org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results produced by task execution 54a627d8091077e6742cd74d754016a7
10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task - Map (3/4) switched to CANCELED
10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map (2/4)
10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map (3/4)
10:14:01,118 INFO org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results produced by task execution dafdfde1351e3eb9593fa227e8255b57
10:14:01,119 INFO org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results produced by task execution 7c219dda43bc53571d306fc0df4468fa
10:14:01,119 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Map (4a13efc94d64d61532106fbd9bdfaedb)
10:14:01,120 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Map (99136f14b2e32b44318b49b2ad39dde5)
10:14:01,120 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Map (029bc351981b5056208839ce988f0f3f)
10:14:01,121 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CANCELING to CANCELED
10:14:01,121 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CANCELING to CANCELED
10:14:01,121 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down.
10:14:01,122 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CANCELING to CANCELED
10:14:01,122 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
08/08/2016 10:14:01 Map(1/4) switched to CANCELED
10:14:01,123 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
08/08/2016 10:14:01 Map(2/4) switched to CANCELED
10:14:01,123 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
10:14:01,124 INFO org.apache.flink.runtime.taskmanager.Task - Map (4/4) switched to CANCELED
08/08/2016 10:14:01 Map(3/4) switched to CANCELED
10:14:01,124 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map (4/4)
10:14:01,124 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Map (ae5e9324d04ffac7843317749a2e86dd)
10:14:01,131 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CANCELING to CANCELED
10:14:01,132 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
08/08/2016 10:14:01 Map(4/4) switched to CANCELED
10:14:01,147 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) switched to CANCELED
10:14:01,148 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (1/1)
10:14:01,148 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (9ffba0e9f84adca163121d50f88e519e)
10:14:01,149 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from CANCELING to CANCELED
10:14:01,150 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
10:14:01,154 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,154 INFO org.apache.flink.runtime.client.JobClientActor - 08/08/2016 10:14:01 Job execution switched to status FAILED.
08/08/2016 10:14:01 Job execution switched to status FAILED.
10:14:01,160 INFO org.apache.flink.runtime.client.JobClient - Job execution failed
10:14:01,160 INFO org.apache.flink.runtime.client.JobClientActor - Terminate JobClientActor.
10:14:01,161 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Stopping FlinkMiniCluster.
10:14:01,161 INFO org.apache.flink.runtime.client.JobClientActor - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:01,174 INFO org.apache.flink.runtime.jobmanager.JobManager - Stopping JobManager akka://flink/user/jobmanager_1.
10:14:01,184 INFO org.apache.flink.runtime.taskmanager.TaskManager - Stopping TaskManager akka://flink/user/taskmanager_1#2008243751.
10:14:01,184 INFO org.apache.flink.runtime.taskmanager.TaskManager - Disassociating from JobManager
10:14:01,192 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache
10:14:01,194 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:34360
10:14:01,208 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
10:14:01,212 INFO org.apache.flink.runtime.taskmanager.TaskManager - Task manager akka://flink/user/taskmanager_1 is completely shut down.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
________________________________
From: Robert Metzger <rm...@apache.org>
Sent: Monday, August 8, 2016 9:48:39 AM
To: user@flink.apache.org
Subject: Re: Using RabbitMQ Sinks
Hi Paul,
the example in the code is outdated, StringToByteSerializer has probably been removed quite a while ago. I'll update the documentation once we figured out the other problem you reported.
What's the exception you are getting?
Regards,
Robert
On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <pa...@physiq.com>> wrote:
Hi all,
The documentation describing the use of RabbitMQ as a sink gives the following example:
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
However, a search of the flink github mirrored repo<https://github.com/apache/flink> does not show where StringToByteSerializer is defined and only shows it being used in the documentation of
this example. I've tried using a SimpleStringSchema which seems to handle serialization but this raises an exception when I attempt to
run it.
Does anyone have any experience with using a RabbitMQ sink? Any pointers as to what I'm doing wrong.
Thanks,
Paul
Re: Using RabbitMQ Sinks
Posted by Robert Metzger <rm...@apache.org>.
Hi Paul,
the example in the code is outdated, StringToByteSerializer has probably
been removed quite a while ago. I'll update the documentation once we
figured out the other problem you reported.
What's the exception you are getting?
Regards,
Robert
On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <pa...@physiq.com>
wrote:
> Hi all,
>
>
> The documentation describing the use of RabbitMQ as a sink gives the
> following example:
>
>
> RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5000).setUserName(..).setPassword(..).setVirtualHost("/").build();stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));
>
> However, a search of the flink github mirrored repo
> <https://github.com/apache/flink> does not show where
> StringToByteSerializer is defined and only shows it being used in the
> documentation of
>
> this example. I've tried using a SimpleStringSchema which seems to
> handle serialization but this raises an exception when I attempt to
>
> run it.
>
>
> Does anyone have any experience with using a RabbitMQ sink? Any pointers
> as to what I'm doing wrong.
>
>
> Thanks,
>
> Paul
>
>