You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Eamon Kavanagh <ka...@gmail.com> on 2016/06/19 22:17:18 UTC

Cassamdra Connector in Scala

Hey Mailing List,

I'm trying to use the Cassandra connector that came out recently (
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)
in Scala but I'm having trouble with types when I use
CassandraSink.addSink(in: DataStream).

If I don't define the type it can't seem to properly infer it and if I do
define the type I still get an error saying there's a type mismatch.  The
compile errror is

*error: type arguments [(String, String, Int),Any] do not conform to method
addSink's type parameter bounds [IN,T <:
org.apache.flink.api.java.tuple.Tuple]*

Is this a Scala issue?  Should I switch over to Java?


Thanks!
Eamon

Re: Cassamdra Connector in Scala

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think the root problem is that the CassandraSink methods that can work
with tuples accept the Tuple type that comes with Flink and not the Scala
Tuple types. If I'm not mistaken Robert is using the Flink Tuple types in
his example, that's why it works.

Cheers,
Aljoscha

On Tue, 21 Jun 2016 at 11:54 Robert Metzger <rm...@apache.org> wrote:

> Thank you for reporting the issue.
> We are very happy if people try out new code before the release. Please
> keep testing our Cassandra connector and report errors or usability issues.
>
> I was not able to compile your code using Scala 2.10, however, I got this
> version running: (I basically changed the iterator into a List).
> I didn't get any type-related exceptions.
>
> import com.datastax.driver.core.Cluster
> import org.apache.flink.streaming.api.scala._
> import collection.JavaConverters._
> import com.datastax.driver.core.Cluster.Builder
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.cassandra.{ClusterBuilder, CassandraSink}
>
> object MLQ {
>   def main(args: Array[String]) {
>
>
>     val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"
>
>     val list = List(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))
>
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val source = env.fromCollection(list.asJava)
>
>     CassandraSink.addSink(source)
>       .setQuery(INSERT)
>       .setClusterBuilder(new ClusterBuilder() {
>         override def buildCluster(builder: Builder): Cluster = {
>
>
>           builder.addContactPoint("127.0.0.1").build()
>         }
>       })
>       .build()
>
>     env.execute("WriteTupleIntoCassandra")
>   }
> }
>
> What I got is the following (and I think its perfectly fine for not having
> a cassandra cluster running):
>
> 11:41:33,513 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>       - class scala.Tuple2 must have a default constructor to be used as a
> POJO.
> 11:41:34,573 INFO
>  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  -
> Running job on local embedded Flink mini cluster
> 11:41:34,975 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
>       - Starting FlinkMiniCluster.
> 11:41:35,141 INFO  akka.event.slf4j.Slf4jLogger
>        - Slf4jLogger started
> 11:41:35,155 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Created BLOB server storage directory
> /tmp/blobStore-01be4415-af52-4def-856e-94626e3f4f22
> 11:41:35,156 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Started BLOB server at 0.0.0.0:40771 - max concurrent requests:
> 50 - max backlog: 1000
> 11:41:35,162 INFO
>  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job
> manager savepoint state backend.
> 11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
>       - Started memory archivist akka://flink/user/archive_1
> 11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Starting JobManager at akka://flink/user/jobmanager_1.
> 11:41:35,171 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - JobManager akka://flink/user/jobmanager_1 was granted leadership
> with leader session ID None.
> 11:41:35,175 INFO
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 11:41:35,180 INFO
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - Resource Manager associating with leading JobManager
> Actor[akka://flink/user/jobmanager_1#-885000089] - leader session null
> 11:41:35,182 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Messages between TaskManager and JobManager have a max timeout of
> 10000 milliseconds
> 11:41:35,185 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Temporary file directory '/tmp': total 7 GB, usable 7 GB (100.00%
> usable)
> 11:41:35,397 INFO
>  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
> 64 MB for network buffer pool (number of memory segments: 2048, bytes per
> segment: 32768).
> 11:41:35,398 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Limiting managed memory to 1191 MB, memory will be allocated
> lazily.
> 11:41:35,400 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
>        - I/O manager uses directory
> /tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900 for spill files.
> 11:41:35,407 INFO  org.apache.flink.runtime.filecache.FileCache
>        - User file cache uses directory
> /tmp/flink-dist-cache-2775b3c6-ca0f-43a2-8432-623c8c99880c
> 11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Starting TaskManager actor at
> akka://flink/user/taskmanager_1#2005524421.
> 11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - TaskManager data connection information: localhost.localdomain
> (dataPort=34395)
> 11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - TaskManager has 8 task slot(s).
> 11:41:35,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Memory usage stats: [HEAP: 85/240/3541 MB, NON HEAP: 27/28/-1 MB
> (used/committed/max)]
> 11:41:36,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Trying to register at JobManager akka://flink/user/jobmanager_1
> (attempt 1, timeout: 500 milliseconds)
> 11:41:36,026 INFO
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - TaskManager ResourceID{resourceId='0d1a407780ffa127acdd6b036c4867a8'}
> has registered.
> 11:41:36,028 INFO  org.apache.flink.runtime.instance.InstanceManager
>       - Registered TaskManager at localhost
> (akka://flink/user/taskmanager_1) as 6ef2a28a61851c9639ad74a7b3ba4cc8.
> Current number of registered hosts is 1. Current number of alive task slots
> is 8.
> 11:41:36,034 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Successful registration at JobManager
> (akka://flink/user/jobmanager_1), starting network stack and library cache.
> 11:41:36,039 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Determined BLOB server address to be localhost/127.0.0.1:40771.
> Starting BLOB cache.
> 11:41:36,040 INFO  org.apache.flink.runtime.blob.BlobCache
>       - Created BLOB cache storage directory
> /tmp/blobStore-295ac2e2-91af-4a9b-a761-60fbd1a86b78
> 11:41:36,042 INFO  org.apache.flink.metrics.MetricRegistry
>       - No metrics reporter configured, exposing metrics via JMX
> 11:41:36,051 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Received job WriteTupleIntoCassandra
> (502e42477e7a0161c3e678dcabbe1b0c).
> 11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Could not submit job WriteTupleIntoCassandra
> (502e42477e7a0161c3e678dcabbe1b0c), because there is no connection to a
> JobManager.
> 11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Disconnect from JobManager null.
> 11:41:36,054 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Connect to JobManager
> Actor[akka://flink/user/jobmanager_1#-885000089].
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Connected to new JobManager akka://flink/user/jobmanager_1.
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Sending message to JobManager akka://flink/user/jobmanager_1 to
> submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c) and
> wait for progress
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Upload jar files to job manager akka://flink/user/jobmanager_1.
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Submit job to the job manager akka://flink/user/jobmanager_1.
> 11:41:36,057 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Submitting job 502e42477e7a0161c3e678dcabbe1b0c
> (WriteTupleIntoCassandra).
> 11:41:36,095 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Using restart strategy NoRestartStrategy for
> 502e42477e7a0161c3e678dcabbe1b0c.
> 11:41:36,125 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Scheduling job 502e42477e7a0161c3e678dcabbe1b0c
> (WriteTupleIntoCassandra).
> 11:41:36,125 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Job was successfully submitted to the JobManager
> akka://flink/user/jobmanager_1.
> 11:41:36,126 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Job execution switched to status RUNNING.
> 06/21/2016 11:41:36 Job execution switched to status RUNNING.
> 11:41:36,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from CREATED to SCHEDULED
> 11:41:36,127 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED
> 11:41:36,128 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Status of job 502e42477e7a0161c3e678dcabbe1b0c
> (WriteTupleIntoCassandra) changed to RUNNING.
> 11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Source: Collection Source (1/1) (attempt #0) to localhost
> 11:41:36,130 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> DEPLOYING
> 11:41:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from CREATED to SCHEDULED
> 11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (1/8) (attempt #0) to localhost
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from CREATED to SCHEDULED
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (2/8) (attempt #0) to localhost
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from CREATED to SCHEDULED
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (3/8) (attempt #0) to localhost
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from CREATED to SCHEDULED
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (4/8) (attempt #0) to localhost
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from CREATED to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (5/8) (attempt #0) to localhost
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to
> DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from CREATED to SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to
> DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (6/8) (attempt #0) to localhost
> 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from CREATED to SCHEDULED
> 11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to
> SCHEDULED
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (7/8) (attempt #0) to localhost
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from CREATED to SCHEDULED
> 11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (8/8) (attempt #0) to localhost
> 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED
> 11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING
> 11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED
> 11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING
> 11:41:36,155 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Source: Collection Source (1/1)
> 11:41:36,157 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Source: Collection Source (1/1)
> 11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Source: Collection Source (1/1)
> [DEPLOYING]
> 11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (1/8)
> 11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (1/8)
> 11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (2/8)
> 11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (2/8)
> 11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (1/8) [DEPLOYING]
> 11:41:36,164 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (2/8) [DEPLOYING]
> 11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (1/8) switched to RUNNING
> 11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (2/8) switched to RUNNING
> 11:41:36,169 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Source: Collection Source (1/1) switched to RUNNING
> 11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (3/8)
> 11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (4/8)
> 11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (3/8)
> 11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (3/8) [DEPLOYING]
> 11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (3/8) switched to RUNNING
> 11:41:36,173 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (4/8)
> 11:41:36,175 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (4/8) [DEPLOYING]
> 11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (4/8) switched to RUNNING
> 11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (5/8)
> 11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (5/8)
> 11:41:36,177 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (5/8) [DEPLOYING]
> 11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (5/8) switched to RUNNING
> 11:41:36,177 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,178 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,178 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,179 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (6/8)
> 11:41:36,179 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,179 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,180 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (7/8)
> 11:41:36,181 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (8/8)
> 11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (7/8)
> 11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (7/8) [DEPLOYING]
> 11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (7/8) switched to RUNNING
> 11:41:36,183 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,183 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,184 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (6/8)
> 11:41:36,185 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (8/8)
> 11:41:36,186 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from DEPLOYING to RUNNING
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (6/8) [DEPLOYING]
> 11:41:36,187 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to
> RUNNING
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (6/8) switched to RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (8/8) [DEPLOYING]
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (8/8) switched to RUNNING
> 11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from DEPLOYING to RUNNING
> 11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from DEPLOYING to RUNNING
> 11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING
> 11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING
> 11:41:36,190 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,190 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from DEPLOYING to RUNNING
> 11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from DEPLOYING to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from DEPLOYING to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> RUNNING
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from DEPLOYING to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from DEPLOYING to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to
> RUNNING
> 11:41:36,196 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from DEPLOYING to RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING
> 11:41:36,196 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING
> 11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Source: Collection Source (1/1) switched to FINISHED
> 11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Source: Collection Source (1/1)
> 11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FINISHED to
> JobManager for task Source: Collection Source
> (3ab880aed927f4375ec55fcd76c05fb5)
> 11:41:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from RUNNING to FINISHED
> 11:41:36,207 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> FINISHED
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED
> 11:41:36,373 INFO  com.datastax.driver.core.NettyUtil
>        - Found Netty's native epoll transport in the classpath, using it
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,516 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,896 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,896 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (3/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,898 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (6/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,899 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (4/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,904 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,906 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (5/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (7/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (8/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (1/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,911 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (2/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (5/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (2/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (7/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (8/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (1/8)
> 11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (3/8)
> 11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (4/8)
> 11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (6/8)
> 11:41:43,920 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (da635c97adfd28fdd8b7ff1553a653a6)
> 11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (5b11c48da29711e75b8c1dee3491b9b1)
> 11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (5dc9a026f642fe50c29cbd895152cf01)
> 11:41:43,922 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (2695a57c8a28412e62d58f7dbe379def)
> 11:41:43,924 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (d5d15f4eeb628ecc89751aa22bc9fbef)
> 11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (696224c4e158a7925b2d8ae7fc17991f)
> 11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (2fcf053fb92f9a4eccd75781a15b5d62)
> 11:41:43,926 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (31e4ff30d1360f1e4c10090c0a59a498)
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from RUNNING to FAILED
> 11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from RUNNING to FAILED
> 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from RUNNING to FAILED
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from RUNNING to CANCELING
> 11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> <span cla
>

Re: Cassamdra Connector in Scala

Posted by Robert Metzger <rm...@apache.org>.
Thank you for reporting the issue.
We are very happy if people try out new code before the release. Please
keep testing our Cassandra connector and report errors or usability issues.

I was not able to compile your code using Scala 2.10, however, I got this
version running: (I basically changed the iterator into a List).
I didn't get any type-related exceptions.

import com.datastax.driver.core.Cluster
import org.apache.flink.streaming.api.scala._
import collection.JavaConverters._
import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.cassandra.{ClusterBuilder,
CassandraSink}

object MLQ {
  def main(args: Array[String]) {
    val INSERT = "INSERT INTO test.writetuple (element1, element2)
VALUES (?, ?)"

    val list = List(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))


    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val source = env.fromCollection(list.asJava)

    CassandraSink.addSink(source)
      .setQuery(INSERT)
      .setClusterBuilder(new ClusterBuilder() {
        override def buildCluster(builder: Builder): Cluster = {
          builder.addContactPoint("127.0.0.1").build()
        }
      })
      .build()

    env.execute("WriteTupleIntoCassandra")
  }
}

What I got is the following (and I think its perfectly fine for not having
a cassandra cluster running):

11:41:33,513 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
      - class scala.Tuple2 must have a default constructor to be used as a
POJO.
11:41:34,573 INFO
 org.apache.flink.streaming.api.environment.LocalStreamEnvironment  -
Running job on local embedded Flink mini cluster
11:41:34,975 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
      - Starting FlinkMiniCluster.
11:41:35,141 INFO  akka.event.slf4j.Slf4jLogger
     - Slf4jLogger started
11:41:35,155 INFO  org.apache.flink.runtime.blob.BlobServer
     - Created BLOB server storage directory
/tmp/blobStore-01be4415-af52-4def-856e-94626e3f4f22
11:41:35,156 INFO  org.apache.flink.runtime.blob.BlobServer
     - Started BLOB server at 0.0.0.0:40771 - max concurrent requests: 50 -
max backlog: 1000
11:41:35,162 INFO
 org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job
manager savepoint state backend.
11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
      - Started memory archivist akka://flink/user/archive_1
11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Starting JobManager at akka://flink/user/jobmanager_1.
11:41:35,171 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - JobManager akka://flink/user/jobmanager_1 was granted leadership
with leader session ID None.
11:41:35,175 INFO
 org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
11:41:35,180 INFO
 org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 - Resource Manager associating with leading JobManager
Actor[akka://flink/user/jobmanager_1#-885000089] - leader session null
11:41:35,182 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Messages between TaskManager and JobManager have a max timeout of
10000 milliseconds
11:41:35,185 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Temporary file directory '/tmp': total 7 GB, usable 7 GB (100.00%
usable)
11:41:35,397 INFO
 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
64 MB for network buffer pool (number of memory segments: 2048, bytes per
segment: 32768).
11:41:35,398 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Limiting managed memory to 1191 MB, memory will be allocated lazily.
11:41:35,400 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
     - I/O manager uses directory
/tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900 for spill files.
11:41:35,407 INFO  org.apache.flink.runtime.filecache.FileCache
     - User file cache uses directory
/tmp/flink-dist-cache-2775b3c6-ca0f-43a2-8432-623c8c99880c
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Starting TaskManager actor at
akka://flink/user/taskmanager_1#2005524421.
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - TaskManager data connection information: localhost.localdomain
(dataPort=34395)
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - TaskManager has 8 task slot(s).
11:41:35,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Memory usage stats: [HEAP: 85/240/3541 MB, NON HEAP: 27/28/-1 MB
(used/committed/max)]
11:41:36,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Trying to register at JobManager akka://flink/user/jobmanager_1
(attempt 1, timeout: 500 milliseconds)
11:41:36,026 INFO
 org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 - TaskManager ResourceID{resourceId='0d1a407780ffa127acdd6b036c4867a8'}
has registered.
11:41:36,028 INFO  org.apache.flink.runtime.instance.InstanceManager
      - Registered TaskManager at localhost
(akka://flink/user/taskmanager_1) as 6ef2a28a61851c9639ad74a7b3ba4cc8.
Current number of registered hosts is 1. Current number of alive task slots
is 8.
11:41:36,034 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Successful registration at JobManager
(akka://flink/user/jobmanager_1), starting network stack and library cache.
11:41:36,039 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Determined BLOB server address to be localhost/127.0.0.1:40771.
Starting BLOB cache.
11:41:36,040 INFO  org.apache.flink.runtime.blob.BlobCache
      - Created BLOB cache storage directory
/tmp/blobStore-295ac2e2-91af-4a9b-a761-60fbd1a86b78
11:41:36,042 INFO  org.apache.flink.metrics.MetricRegistry
      - No metrics reporter configured, exposing metrics via JMX
11:41:36,051 INFO  org.apache.flink.runtime.client.JobClientActor
     - Received job WriteTupleIntoCassandra
(502e42477e7a0161c3e678dcabbe1b0c).
11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor
     - Could not submit job WriteTupleIntoCassandra
(502e42477e7a0161c3e678dcabbe1b0c), because there is no connection to a
JobManager.
11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor
     - Disconnect from JobManager null.
11:41:36,054 INFO  org.apache.flink.runtime.client.JobClientActor
     - Connect to JobManager
Actor[akka://flink/user/jobmanager_1#-885000089].
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
     - Connected to new JobManager akka://flink/user/jobmanager_1.
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
     - Sending message to JobManager akka://flink/user/jobmanager_1 to
submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c) and
wait for progress
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
     - Upload jar files to job manager akka://flink/user/jobmanager_1.
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
     - Submit job to the job manager akka://flink/user/jobmanager_1.
11:41:36,057 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Submitting job 502e42477e7a0161c3e678dcabbe1b0c
(WriteTupleIntoCassandra).
11:41:36,095 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Using restart strategy NoRestartStrategy for
502e42477e7a0161c3e678dcabbe1b0c.
11:41:36,125 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Scheduling job 502e42477e7a0161c3e678dcabbe1b0c
(WriteTupleIntoCassandra).
11:41:36,125 INFO  org.apache.flink.runtime.client.JobClientActor
     - Job was successfully submitted to the JobManager
akka://flink/user/jobmanager_1.
11:41:36,126 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Job execution switched to status RUNNING.
06/21/2016 11:41:36 Job execution switched to status RUNNING.
11:41:36,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
switched from CREATED to SCHEDULED
11:41:36,127 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
SCHEDULED
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED
11:41:36,128 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Status of job 502e42477e7a0161c3e678dcabbe1b0c
(WriteTupleIntoCassandra) changed to RUNNING.
11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
switched from SCHEDULED to DEPLOYING
11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Source: Collection Source (1/1) (attempt #0) to localhost
11:41:36,130 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
DEPLOYING
11:41:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
switched from CREATED to SCHEDULED
11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
switched from SCHEDULED to DEPLOYING
11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (1/8) (attempt #0) to localhost
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
switched from CREATED to SCHEDULED
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
switched from SCHEDULED to DEPLOYING
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (2/8) (attempt #0) to localhost
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
switched from CREATED to SCHEDULED
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
switched from SCHEDULED to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (3/8) (attempt #0) to localhost
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
switched from CREATED to SCHEDULED
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
switched from SCHEDULED to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (4/8) (attempt #0) to localhost
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
switched from CREATED to SCHEDULED
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED
11:41:36,137 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
switched from SCHEDULED to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (5/8) (attempt #0) to localhost
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
switched from CREATED to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
switched from SCHEDULED to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (6/8) (attempt #0) to localhost
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
switched from CREATED to SCHEDULED
11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
switched from SCHEDULED to DEPLOYING
11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (7/8) (attempt #0) to localhost
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
switched from CREATED to SCHEDULED
11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
switched from SCHEDULED to DEPLOYING
11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Deploying Sink: Cassandra Sink (8/8) (attempt #0) to localhost
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING
11:41:36,155 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Source: Collection Source (1/1)
11:41:36,157 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Source: Collection Source (1/1)
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Source: Collection Source (1/1)
[DEPLOYING]
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (1/8)
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (1/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (2/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (2/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (1/8) [DEPLOYING]
11:41:36,164 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (2/8) [DEPLOYING]
11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (1/8) switched to RUNNING
11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (2/8) switched to RUNNING
11:41:36,169 INFO  org.apache.flink.runtime.taskmanager.Task
      - Source: Collection Source (1/1) switched to RUNNING
11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (3/8)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (4/8)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (3/8)
11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (3/8) [DEPLOYING]
11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (3/8) switched to RUNNING
11:41:36,173 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (4/8)
11:41:36,175 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (4/8) [DEPLOYING]
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (4/8) switched to RUNNING
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (5/8)
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (5/8)
11:41:36,177 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (5/8) [DEPLOYING]
11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (5/8) switched to RUNNING
11:41:36,177 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,178 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,178 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,179 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (6/8)
11:41:36,179 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,179 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,180 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (7/8)
11:41:36,181 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Received task Sink: Cassandra Sink (8/8)
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (7/8)
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (7/8) [DEPLOYING]
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (7/8) switched to RUNNING
11:41:36,183 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,183 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,184 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (6/8)
11:41:36,185 INFO  org.apache.flink.runtime.taskmanager.Task
      - Loading JAR files for task Sink: Cassandra Sink (8/8)
11:41:36,186 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
switched from DEPLOYING to RUNNING
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (6/8) [DEPLOYING]
11:41:36,187 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (6/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
      - Registering task at network: Sink: Cassandra Sink (8/8) [DEPLOYING]
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (8/8) switched to RUNNING
11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
switched from DEPLOYING to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
switched from DEPLOYING to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING
11:41:36,190 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
      - No state backend has been specified, using default state backend
(Memory / JobManager)
11:41:36,190 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
      - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
switched from DEPLOYING to RUNNING
11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
switched from DEPLOYING to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
switched from DEPLOYING to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
RUNNING
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
switched from DEPLOYING to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
switched from DEPLOYING to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING
11:41:36,196 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
switched from DEPLOYING to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING
11:41:36,196 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task
      - Source: Collection Source (1/1) switched to FINISHED
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Source: Collection Source (1/1)
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FINISHED to
JobManager for task Source: Collection Source
(3ab880aed927f4375ec55fcd76c05fb5)
11:41:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
switched from RUNNING to FINISHED
11:41:36,207 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
FINISHED
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED
11:41:36,373 INFO  com.datastax.driver.core.NettyUtil
     - Found Netty's native epoll transport in the classpath, using it
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,516 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
while closing session.
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,896 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,896 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (3/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,898 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (6/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (4/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,904 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,906 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (5/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (7/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
      - Task execution failed.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (8/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (1/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,911 INFO  org.apache.flink.runtime.taskmanager.Task
      - Sink: Cassandra Sink (2/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (5/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (2/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (7/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (8/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (1/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (3/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (4/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
      - Freeing task resources for Sink: Cassandra Sink (6/8)
11:41:43,920 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (da635c97adfd28fdd8b7ff1553a653a6)
11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (5b11c48da29711e75b8c1dee3491b9b1)
11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (5dc9a026f642fe50c29cbd895152cf01)
11:41:43,922 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (2695a57c8a28412e62d58f7dbe379def)
11:41:43,924 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (d5d15f4eeb628ecc89751aa22bc9fbef)
11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (696224c4e158a7925b2d8ae7fc17991f)
11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (2fcf053fb92f9a4eccd75781a15b5d62)
11:41:43,926 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Cassandra Sink (31e4ff30d1360f1e4c10090c0a59a498)
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
switched from RUNNING to FAILED
11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
switched from RUNNING to FAILED
06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
switched from RUNNING to FAILED
11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
switched from RUNNING to CANCELING
11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,929 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(1/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(1/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,929 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
3ab880aed927f4375ec55fcd76c05fb5
11:41:43,929 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Job execution switched to status FAILING.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
06/21/2016 11:41:43 Job execution switched to status FAILING.
11:41:43,931 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
switched from CANCELING to CANCELED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,931 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Status of job 502e42477e7a0161c3e678dcabbe1b0c
(WriteTupleIntoCassandra) changed to FAILING.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(4/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(4/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(7/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(7/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELING
06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELING
11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(8/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(8/8) switched to FAILED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,933 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
d5d15f4eeb628ecc89751aa22bc9fbef
11:41:43,933 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELED
06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELED
11:41:43,933 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Status of job 502e42477e7a0161c3e678dcabbe1b0c
(WriteTupleIntoCassandra) changed to FAILED.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,933 INFO  org.apache.flink.runtime.client.JobClientActor
     - 06/21/2016 11:41:43 Job execution switched to status FAILED.
06/21/2016 11:41:43 Job execution switched to status FAILED.
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
2695a57c8a28412e62d58f7dbe379def
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
5dc9a026f642fe50c29cbd895152cf01
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
da635c97adfd28fdd8b7ff1553a653a6
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
5b11c48da29711e75b8c1dee3491b9b1
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
2fcf053fb92f9a4eccd75781a15b5d62
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Discarding the results produced by task execution
696224c4e158a7925b2d8ae7fc17991f
11:41:43,937 INFO  org.apache.flink.runtime.client.JobClientActor
     - Terminate JobClientActor.
11:41:43,937 INFO  org.apache.flink.runtime.client.JobClient
      - Job execution failed
11:41:43,937 INFO  org.apache.flink.runtime.client.JobClientActor
     - Disconnect from JobManager
Actor[akka://flink/user/jobmanager_1#-885000089].
11:41:43,937 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
      - Stopping FlinkMiniCluster.
11:41:43,940 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Stopping JobManager akka://flink/user/jobmanager_1.
11:41:43,940 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Stopping TaskManager akka://flink/user/taskmanager_1#2005524421.
11:41:43,941 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Disassociating from JobManager
11:41:43,942 INFO  org.apache.flink.runtime.blob.BlobCache
      - Shutting down BlobCache
11:41:43,945 INFO  org.apache.flink.runtime.blob.BlobServer
     - Stopped BLOB server at 0.0.0.0:40771
11:41:43,947 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
     - I/O manager removed spill file directory
/tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900
11:41:43,948 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Task manager akka://flink/user/taskmanager_1 is completely shut down.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException:
All host(s) tried for query failed (tried: /127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
Cannot connect))
at
com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at
com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

Process finished with exit code 1


On Tue, Jun 21, 2016 at 6:36 AM, Eamon Kavanagh <ka...@gmail.com>
wrote:

> Hey Jamie,
>
> Here's a simple example that I modeled off of the github example (
> https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java).
> Let me know if I'm doing something silly.
>
>
>
> import com.datastax.driver.core.Cluster.Builder
> import org.apache.flink.api.java.tuple.Tuple2
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.connectors.cassandra.CassandraSink
> import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder
>
> class Test extends App {
>
>   val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES
> (?, ?)"
>
>   val iter = Iterator(new Tuple2("a", 1), new Tuple2("b", 2), new
> Tuple2("c", 3))
>
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val source = env.fromCollection(iter)
>
>   CassandraSink.addSink(source)
>     .setQuery(INSERT)
>     .setClusterBuilder(new ClusterBuilder() {
>       override def buildCluster(builder: Builder) {
>         builder.addContactPoint("127.0.0.1").build()
>       }
>     })
>     .build()
>
>   env.execute("WriteTupleIntoCassandra")
> }
>
>
> On Mon, Jun 20, 2016 at 10:53 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
>
>> This looks like a simple type mismatch.  It's impossible to help with
>> this without seeing your code, though.  Can you post it here?  Thanks.
>>
>> -Jamie
>>
>>
>> On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <
>> kavanagh.c.eamon@gmail.com> wrote:
>>
>>> Hey Mailing List,
>>>
>>> I'm trying to use the Cassandra connector that came out recently (
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)
>>> in Scala but I'm having trouble with types when I use
>>> CassandraSink.addSink(in: DataStream).
>>>
>>> If I don't define the type it can't seem to properly infer it and if I
>>> do define the type I still get an error saying there's a type mismatch.
>>> The compile errror is
>>>
>>> *error: type arguments [(String, String, Int),Any] do not conform to
>>> method addSink's type parameter bounds [IN,T <:
>>> org.apache.flink.api.java.tuple.Tuple]*
>>>
>>> Is this a Scala issue?  Should I switch over to Java?
>>>
>>>
>>> Thanks!
>>> Eamon
>>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>>
>>
>

Re: Cassamdra Connector in Scala

Posted by Eamon Kavanagh <ka...@gmail.com>.
Hey Jamie,

Here's a simple example that I modeled off of the github example (
https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java).
Let me know if I'm doing something silly.



import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder

class Test extends App {

  val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?,
?)"

  val iter = Iterator(new Tuple2("a", 1), new Tuple2("b", 2), new
Tuple2("c", 3))

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val source = env.fromCollection(iter)

  CassandraSink.addSink(source)
    .setQuery(INSERT)
    .setClusterBuilder(new ClusterBuilder() {
      override def buildCluster(builder: Builder) {
        builder.addContactPoint("127.0.0.1").build()
      }
    })
    .build()

  env.execute("WriteTupleIntoCassandra")
}


On Mon, Jun 20, 2016 at 10:53 PM, Jamie Grier <ja...@data-artisans.com>
wrote:

> This looks like a simple type mismatch.  It's impossible to help with this
> without seeing your code, though.  Can you post it here?  Thanks.
>
> -Jamie
>
>
> On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <
> kavanagh.c.eamon@gmail.com> wrote:
>
>> Hey Mailing List,
>>
>> I'm trying to use the Cassandra connector that came out recently (
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)
>> in Scala but I'm having trouble with types when I use
>> CassandraSink.addSink(in: DataStream).
>>
>> If I don't define the type it can't seem to properly infer it and if I do
>> define the type I still get an error saying there's a type mismatch.  The
>> compile errror is
>>
>> *error: type arguments [(String, String, Int),Any] do not conform to
>> method addSink's type parameter bounds [IN,T <:
>> org.apache.flink.api.java.tuple.Tuple]*
>>
>> Is this a Scala issue?  Should I switch over to Java?
>>
>>
>> Thanks!
>> Eamon
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Re: Cassamdra Connector in Scala

Posted by Jamie Grier <ja...@data-artisans.com>.
This looks like a simple type mismatch.  It's impossible to help with this
without seeing your code, though.  Can you post it here?  Thanks.

-Jamie


On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <ka...@gmail.com>
wrote:

> Hey Mailing List,
>
> I'm trying to use the Cassandra connector that came out recently (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)
> in Scala but I'm having trouble with types when I use
> CassandraSink.addSink(in: DataStream).
>
> If I don't define the type it can't seem to properly infer it and if I do
> define the type I still get an error saying there's a type mismatch.  The
> compile errror is
>
> *error: type arguments [(String, String, Int),Any] do not conform to
> method addSink's type parameter bounds [IN,T <:
> org.apache.flink.api.java.tuple.Tuple]*
>
> Is this a Scala issue?  Should I switch over to Java?
>
>
> Thanks!
> Eamon
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com