You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Subhankar Biswas (JIRA)" <ji...@apache.org> on 2016/06/13 04:31:21 UTC
[jira] [Comment Edited] (FLINK-4051) RabbitMQ Source might not
react to cancel signal
[ https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15326810#comment-15326810 ]
Subhankar Biswas edited comment on FLINK-4051 at 6/13/16 4:31 AM:
------------------------------------------------------------------
nextDelivery() use take() method of BlockingQueue, while nextDelivery(timeout) use poll() method.
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.
was (Author: neo20iitkgp):
{code:java}nextDelivery(){code} use take() method of BlockingQueue, while nextDelivery(timeout) use poll() method.
{code:java}
//Code of take
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
{code}
{code:java}
//Code of Poll
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
{code}
So if the queue is empty the thread will be await mode.
> RabbitMQ Source might not react to cancel signal
> ------------------------------------------------
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: Robert Metzger
> Assignee: Subhankar Biswas
>
> As reported here https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517, the RabbitMQ source might block forever / ignore the cancelling signal, if its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)