You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Pavel Molchanov (JIRA)" <ji...@apache.org> on 2019/04/02 15:14:02 UTC

[jira] [Updated] (ARTEMIS-2293) addPacket method in the org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl doesn't notify threads in case of an Exception

     [ https://issues.apache.org/jira/browse/ARTEMIS-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pavel Molchanov updated ARTEMIS-2293:
-------------------------------------
    Description: 
Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method.

[https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]

 

addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
	int flowControlCredit = 0;
	
	synchronized (this) {
	packetAdded = true;
	if (outStream != null) {
	try {
	if (!isContinues) {
	streamEnded = true;
	}
	
	if (fileCache != null) {
	fileCache.cachePackage(chunk);
	}
	
	outStream.write(chunk);
	
	flowControlCredit = flowControlSize;
	
	notifyAll();
	
	if (streamEnded) {
	outStream.close();
	}
	} catch (Exception e) {
	ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
	handledException = e;
	}
	} else {
	if (fileCache != null) {
	try {
	fileCache.cachePackage(chunk);
	} catch (Exception e) {
	ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
	handledException = e;
	}
	}
	
	largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
	}
	}{code}
 

waitCompletion method:
{code:java}
public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
	if (outStream == null) {
	// There is no stream.. it will never achieve the end of streaming
	return false;
	}
	
	long timeOut;
	
	// If timeWait = 0, we will use the readTimeout
	// And we will check if no packets have arrived within readTimeout milliseconds
	if (timeWait != 0) {
	timeOut = System.currentTimeMillis() + timeWait;
	} else {
	timeOut = System.currentTimeMillis() + readTimeout;
	}
	
	while (!streamEnded && handledException == null) {
	try {
	this.wait(timeWait == 0 ? readTimeout : timeWait);
	} catch (InterruptedException e) {
	throw new ActiveMQInterruptedException(e);
	}
	
	if (!streamEnded && handledException == null) {
	if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
	throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
	} else if (System.currentTimeMillis() > timeOut && !packetAdded) {
	throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
	}
	}
	}
	
	checkException();
	
	return streamEnded;
	
	}{code}
 

 

  was:
Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method.

 

addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
	int flowControlCredit = 0;
	
	synchronized (this) {
	packetAdded = true;
	if (outStream != null) {
	try {
	if (!isContinues) {
	streamEnded = true;
	}
	
	if (fileCache != null) {
	fileCache.cachePackage(chunk);
	}
	
	outStream.write(chunk);
	
	flowControlCredit = flowControlSize;
	
	notifyAll();
	
	if (streamEnded) {
	outStream.close();
	}
	} catch (Exception e) {
	ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
	handledException = e;
	}
	} else {
	if (fileCache != null) {
	try {
	fileCache.cachePackage(chunk);
	} catch (Exception e) {
	ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
	handledException = e;
	}
	}
	
	largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
	}
	}{code}
 

waitCompletion method:
{code:java}

public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
	if (outStream == null) {
	// There is no stream.. it will never achieve the end of streaming
	return false;
	}
	
	long timeOut;
	
	// If timeWait = 0, we will use the readTimeout
	// And we will check if no packets have arrived within readTimeout milliseconds
	if (timeWait != 0) {
	timeOut = System.currentTimeMillis() + timeWait;
	} else {
	timeOut = System.currentTimeMillis() + readTimeout;
	}
	
	while (!streamEnded && handledException == null) {
	try {
	this.wait(timeWait == 0 ? readTimeout : timeWait);
	} catch (InterruptedException e) {
	throw new ActiveMQInterruptedException(e);
	}
	
	if (!streamEnded && handledException == null) {
	if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
	throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
	} else if (System.currentTimeMillis() > timeOut && !packetAdded) {
	throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
	}
	}
	}
	
	checkException();
	
	return streamEnded;
	
	}{code}
 

 


> addPacket method in the org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl  doesn't notify threads in case of an Exception
> -----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-2293
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2293
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.6.4
>            Reporter: Pavel Molchanov
>            Priority: Major
>
> Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method.
> [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]
>  
> addPacket method:
> {code:java}
> public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
> 	int flowControlCredit = 0;
> 	
> 	synchronized (this) {
> 	packetAdded = true;
> 	if (outStream != null) {
> 	try {
> 	if (!isContinues) {
> 	streamEnded = true;
> 	}
> 	
> 	if (fileCache != null) {
> 	fileCache.cachePackage(chunk);
> 	}
> 	
> 	outStream.write(chunk);
> 	
> 	flowControlCredit = flowControlSize;
> 	
> 	notifyAll();
> 	
> 	if (streamEnded) {
> 	outStream.close();
> 	}
> 	} catch (Exception e) {
> 	ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
> 	handledException = e;
> 	}
> 	} else {
> 	if (fileCache != null) {
> 	try {
> 	fileCache.cachePackage(chunk);
> 	} catch (Exception e) {
> 	ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
> 	handledException = e;
> 	}
> 	}
> 	
> 	largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
> 	}
> 	}{code}
>  
> waitCompletion method:
> {code:java}
> public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
> 	if (outStream == null) {
> 	// There is no stream.. it will never achieve the end of streaming
> 	return false;
> 	}
> 	
> 	long timeOut;
> 	
> 	// If timeWait = 0, we will use the readTimeout
> 	// And we will check if no packets have arrived within readTimeout milliseconds
> 	if (timeWait != 0) {
> 	timeOut = System.currentTimeMillis() + timeWait;
> 	} else {
> 	timeOut = System.currentTimeMillis() + readTimeout;
> 	}
> 	
> 	while (!streamEnded && handledException == null) {
> 	try {
> 	this.wait(timeWait == 0 ? readTimeout : timeWait);
> 	} catch (InterruptedException e) {
> 	throw new ActiveMQInterruptedException(e);
> 	}
> 	
> 	if (!streamEnded && handledException == null) {
> 	if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
> 	throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
> 	} else if (System.currentTimeMillis() > timeOut && !packetAdded) {
> 	throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
> 	}
> 	}
> 	}
> 	
> 	checkException();
> 	
> 	return streamEnded;
> 	
> 	}{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)