You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/03 13:10:59 UTC

[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

    [ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314074#comment-15314074 ] 

ASF GitHub Bot commented on FLINK-3311:
---------------------------------------

Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    I've reviewed the connector again.
    The issues I've seen previously (failure on restart) are resolved.
    However, I found new issues:
    - The Cassandra Sink doesn't fail (at least not within 15 minutes) if Cassandra is not available anymore. Its probably just a configuration setting of the cassandra driver to fail after a certain amount of time.
    - We should probably introduce a (configurable) limit (nr. records / some gb's) for the write ahead log. It seemed to me, that due to the failed other instance, no checkpoints were able to complete anymore (because some of the cassandra sinks were stuck in the notifyCheckpointComplete()), while other's were accepting data into the WAL. This lead to a lot of data being written into the statebackend. I think the cassandra sink should stop at some point in such a situation.
    
    Also, I would like to test the exactly once behavior on a cluster more thoroughly. Currently, I've only tested whether the connector is properly failing and restoring, but I didn't test if the written data is actually correct.
    
    However, since the code seems to be working under normal operation, I would suggest to merge the connector now, and then file follow up JIRAs for the remaining issues.
    This makes collaboration and reviews easier and allows our users to help testing the cassandra connector.
    
    
    
    Some log:
    ```
    2016-06-03 12:28:36,478 ERROR org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink  - Error while sending value.
    com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
    	at com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128)
    	at com.datastax.driver.core.Responses$Error.asException(Responses.java:114)
    	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
    	at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
    	at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
    	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
    	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
    	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
    	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
    	at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
    	at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
    	at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
    	at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
    	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
    	... 11 more
    2016-06-03 12:28:57,473 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
    2016-06-03 12:28:57,487 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
    2016-06-03 12:29:02,939 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
    2016-06-03 12:29:02,970 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
    2016-06-03 12:29:12,945 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
    2016-06-03 12:29:12,974 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
    2016-06-03 12:29:17,947 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 8000 milliseconds
    2016-06-03 12:29:17,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 8000 milliseconds
    2016-06-03 12:29:28,481 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 16000 milliseconds
    2016-06-03 12:29:28,974 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 16000 milliseconds
    2016-06-03 12:29:44,482 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 32000 milliseconds
    2016-06-03 12:29:44,975 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 32000 milliseconds
    2016-06-03 12:30:16,482 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 64000 milliseconds
    2016-06-03 12:30:16,975 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 64000 milliseconds
    2016-06-03 12:31:20,483 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 128000 milliseconds
    2016-06-03 12:31:20,976 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 128000 milliseconds
    2016-06-03 12:33:28,484 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 256000 milliseconds
    2016-06-03 12:33:28,976 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 256000 milliseconds
    2016-06-03 12:37:44,484 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 512000 milliseconds
    2016-06-03 12:37:44,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 512000 milliseconds
    2016-06-03 12:46:16,485 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 600000 milliseconds
    2016-06-03 12:46:16,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 600000 milliseconds
    2016-06-03 12:46:54,906 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: Custom Source (1/2)
    2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/2) switched to CANCELING
    2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source (1/2) (dec8d24e486ca9937739b7c6e07fbb05).
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Cassandra Sink (1/2)
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Cassandra Sink (1/2) switched to CANCELING
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Cassandra Sink (1/2) (96511ef6293a893b0ef35dd211aea2b9).
    2016-06-03 12:46:55,389 INFO  com.dataartisans.Job                                          - Received cancel in EventGenerator
    2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/2) switched to CANCELED
    2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/2)
    2016-06-03 12:46:55,394 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (dec8d24e486ca9937739b7c6e07fbb05)
    2016-06-03 12:47:24,911 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:47:54,912 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:48:24,913 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:48:54,915 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:49:24,916 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:49:54,918 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:50:24,919 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    ```


> Add a connector for streaming data into Cassandra
> -------------------------------------------------
>
>                 Key: FLINK-3311
>                 URL: https://issues.apache.org/jira/browse/FLINK-3311
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>            Reporter: Robert Metzger
>            Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance (for the tests): http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store POJOs in Cassandra (there are annotations for defining the primary key and types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)