You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Subhankar Biswas (JIRA)" <ji...@apache.org> on 2016/06/13 04:31:21 UTC

[jira] [Comment Edited] (FLINK-4051) RabbitMQ Source might not react to cancel signal

    [ https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15326810#comment-15326810 ] 

Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:31 AM:
------------------------------------------------------------------

nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
So if the queue is empty the thread will be await mode.


was (Author: neo20iitkgp):
 {code:java}nextDelivery(){code} use take() method of BlockingQueue, while nextDelivery(timeout) use poll() method. 
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
{code:java}
//Code of Poll 
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            while ( (x = unlinkFirst()) == null) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return x;
        } finally {
            lock.unlock();
        }
    }
{code}
So if the queue is empty the thread will be await mode.

> RabbitMQ Source might not react to cancel signal
> ------------------------------------------------
>
>                 Key: FLINK-4051
>                 URL: https://issues.apache.org/jira/browse/FLINK-4051
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Robert Metzger
>            Assignee: Subhankar Biswas
>
> As reported here https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517, the RabbitMQ source might block forever / ignore the cancelling signal, if its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)