You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Pavel Molchanov (JIRA)" <ji...@apache.org> on 2019/04/02 15:14:02 UTC
[jira] [Updated] (ARTEMIS-2293) addPacket method in the
org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl
doesn't notify threads in case of an Exception
[ https://issues.apache.org/jira/browse/ARTEMIS-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pavel Molchanov updated ARTEMIS-2293:
-------------------------------------
Description:
Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method.
[https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]
addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
int flowControlCredit = 0;
synchronized (this) {
packetAdded = true;
if (outStream != null) {
try {
if (!isContinues) {
streamEnded = true;
}
if (fileCache != null) {
fileCache.cachePackage(chunk);
}
outStream.write(chunk);
flowControlCredit = flowControlSize;
notifyAll();
if (streamEnded) {
outStream.close();
}
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
} else {
if (fileCache != null) {
try {
fileCache.cachePackage(chunk);
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
}
largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
}
}{code}
waitCompletion method:
{code:java}
public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
if (outStream == null) {
// There is no stream.. it will never achieve the end of streaming
return false;
}
long timeOut;
// If timeWait = 0, we will use the readTimeout
// And we will check if no packets have arrived within readTimeout milliseconds
if (timeWait != 0) {
timeOut = System.currentTimeMillis() + timeWait;
} else {
timeOut = System.currentTimeMillis() + readTimeout;
}
while (!streamEnded && handledException == null) {
try {
this.wait(timeWait == 0 ? readTimeout : timeWait);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
if (!streamEnded && handledException == null) {
if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
} else if (System.currentTimeMillis() > timeOut && !packetAdded) {
throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
}
}
}
checkException();
return streamEnded;
}{code}
was:
Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method.
addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
int flowControlCredit = 0;
synchronized (this) {
packetAdded = true;
if (outStream != null) {
try {
if (!isContinues) {
streamEnded = true;
}
if (fileCache != null) {
fileCache.cachePackage(chunk);
}
outStream.write(chunk);
flowControlCredit = flowControlSize;
notifyAll();
if (streamEnded) {
outStream.close();
}
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
} else {
if (fileCache != null) {
try {
fileCache.cachePackage(chunk);
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
}
largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
}
}{code}
waitCompletion method:
{code:java}
public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
if (outStream == null) {
// There is no stream.. it will never achieve the end of streaming
return false;
}
long timeOut;
// If timeWait = 0, we will use the readTimeout
// And we will check if no packets have arrived within readTimeout milliseconds
if (timeWait != 0) {
timeOut = System.currentTimeMillis() + timeWait;
} else {
timeOut = System.currentTimeMillis() + readTimeout;
}
while (!streamEnded && handledException == null) {
try {
this.wait(timeWait == 0 ? readTimeout : timeWait);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
if (!streamEnded && handledException == null) {
if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
} else if (System.currentTimeMillis() > timeOut && !packetAdded) {
throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
}
}
}
checkException();
return streamEnded;
}{code}
> addPacket method in the org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl doesn't notify threads in case of an Exception
> -----------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-2293
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2293
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.6.4
> Reporter: Pavel Molchanov
> Priority: Major
>
> Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method.
> [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]
>
> addPacket method:
> {code:java}
> public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
> int flowControlCredit = 0;
>
> synchronized (this) {
> packetAdded = true;
> if (outStream != null) {
> try {
> if (!isContinues) {
> streamEnded = true;
> }
>
> if (fileCache != null) {
> fileCache.cachePackage(chunk);
> }
>
> outStream.write(chunk);
>
> flowControlCredit = flowControlSize;
>
> notifyAll();
>
> if (streamEnded) {
> outStream.close();
> }
> } catch (Exception e) {
> ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
> handledException = e;
> }
> } else {
> if (fileCache != null) {
> try {
> fileCache.cachePackage(chunk);
> } catch (Exception e) {
> ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
> handledException = e;
> }
> }
>
> largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
> }
> }{code}
>
> waitCompletion method:
> {code:java}
> public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
> if (outStream == null) {
> // There is no stream.. it will never achieve the end of streaming
> return false;
> }
>
> long timeOut;
>
> // If timeWait = 0, we will use the readTimeout
> // And we will check if no packets have arrived within readTimeout milliseconds
> if (timeWait != 0) {
> timeOut = System.currentTimeMillis() + timeWait;
> } else {
> timeOut = System.currentTimeMillis() + readTimeout;
> }
>
> while (!streamEnded && handledException == null) {
> try {
> this.wait(timeWait == 0 ? readTimeout : timeWait);
> } catch (InterruptedException e) {
> throw new ActiveMQInterruptedException(e);
> }
>
> if (!streamEnded && handledException == null) {
> if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
> throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
> } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
> throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
> }
> }
> }
>
> checkException();
>
> return streamEnded;
>
> }{code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)