You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (Jira)" <ji...@apache.org> on 2019/09/03 09:49:00 UTC

[jira] [Assigned] (FLINK-13059) Cassandra Connector leaks Semaphore on Exception; hangs on close

     [ https://issues.apache.org/jira/browse/FLINK-13059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chesnay Schepler reassigned FLINK-13059:
----------------------------------------

    Assignee: Mads Chr. Olesen

> Cassandra Connector leaks Semaphore on Exception; hangs on close
> ----------------------------------------------------------------
>
>                 Key: FLINK-13059
>                 URL: https://issues.apache.org/jira/browse/FLINK-13059
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Cassandra
>    Affects Versions: 1.8.0
>            Reporter: Mads Chr. Olesen
>            Assignee: Mads Chr. Olesen
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In CassandraSinkBase the following code is present (comments are mine):
>  
> {code:java}
> public void invoke(IN value) throws Exception {
>    checkAsyncErrors();
>    tryAcquire();
>    //Semaphore held here
>    final ListenableFuture<V> result = send(value);
>    Futures.addCallback(result, callback); //Callback releases semaphore
> }{code}
> Any Exception happening inside send(value) will result in the semaphore not being released. Such exceptions are possible, e.g.
> {code:java}
> com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key parts are missing: hest
> at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
> at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
> at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
> at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
> at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
> at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
> at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> {code}
> The result of the semaphore not being released will be that when the exception bubbles out and causes the job to close, CassandraSinkBase.flush() will eventually be called. Flush will be deadlocked trying to acquire config.getMaxConcurrentRequests() from the semaphore, which has 1 less than that available.
> The Flink job will thus be half-way closed, but marked as "RUNNING". Checkpointing will however fail with
> {noformat}
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 201325 of job XXX. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: XXX (3/4) was not running {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)