You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Abhishek Kumar <ma...@gmail.com> on 2018/06/06 19:11:36 UTC

producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

HI Robbie,

As you aware that we are having issue for "The MessageProducer is closed".
This is default behaviour of serviceBus.

I have implemented exception handling for handling this scenario. Please
find below sample code snippet..


I have two queries for implementation. Please find below details.

1. LINE (1) --> Whenever MessageProducer is closed then i will get
exception in catch block "LINE (2)" in below code. I cant send same
"jmsMessage" object for re sending message because status "jmsMessage"
object changed to "is read only".

So for handling this scenario, i stored original object in "LINE (1)" and
sending "origionalJmsMessage" object.

I am maintaining two copy of jmsMessage which seems not idle. Am i doing
correct?

Could you please suggest any better way or please confirm if my approach is
fine?


2. LINE (3) --> We are using ServiceBus broker, so for handling 10m idle
timeout we need to use CompletionListener. As per your exaplaination in
earlier response, we can receive exception on "onException()" while sending
the message to broker after some steps (like after isClosed() validation).

While doing the testing, i received error from Qpid "Send failed due to
connection loss".

Could you suggest, what should we do in "onException()" block. Like reset
session and producer ?

If you have any link/guideline/pattern which guide us about type of
expected exception while sending message to client. This will help us to
understand scenario.



////// Code /////

public class Sender{

private CompletionHandler completionHandler = new CompletionHandler();

public void sendMessage(final Message jmsMessage) throws JMSException {
//Store original JmsMessage which will be helpful for
//re-sending message in case of closed MessageProducer
final Message origionalJmsMessage = jmsMessage;  //LINE (1) - Need to
maintatin original message for re send
try {
producer.send(jmsMessage, completionHandler);
} catch (IllegalStateException illegalStateException) { //LINE (2)
if (illegalStateException.getMessage().contains("The MessageProducer is
closed")) {
// Reset Producer due to Idle TimeOut after 10m
producer = getProducer(session, endpoint);
//producer.send(jmsMessage, completionHandler);  //LINE (3) - can not
perform this operation because jmsMessage is read only, so we can't send
back
producer.send(origionalJmsMessage, completionHandler);
}else{
throw illegalStateException;
}
} catch(Exception exception){
logger.error("not able to publish message :: exception Message :: " +
exception.getMessage());
exception.printStackTrace();
//throw back exception, so client can handle this
throw exception;
}
}

class CompletionHandler implements CompletionListener {

@Override
public void onCompletion(Message message) {
try {
if(logger.isDebugEnabled()){
logger.debug("Send message completed for JMSMessageID :: " +
message.getJMSMessageID());
}
} catch (JMSException e) {
e.printStackTrace();
}
}

@Override
public void onException(Message message, Exception exception) { //LINE (3)
- What is the standard practise to handle this?

try {
logger.error("onException :: failed to send message for JMSMessageID :: " +
message.getJMSMessageID() +", Exception Message :: "+
exception.getMessage());
exception.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}

}

}
}

Regards,
Abhishek Kumar

Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by Robbie Gemmell <ro...@gmail.com>.
Your expectation was incorrect. That config toggle existed from before
the clients implementation of JMS 2.0 and only controls behaviour of
send calls made without a CompletionListener in place. Specifically,
it forces sync sends for non-persistent messages and inside a
transactions. The explicitly async send using CompletionListener are
*always* async. The CompletionListener is always fired from another
thread, as required by the JMS spec, once a result is received from
the server or once it is known none will be.

The different between sync and async is whether the send method
returns before a final result for the send (e.g message accepted,
message rejected) is known from the server.

On 8 June 2018 at 17:03, akabhishek1 <ma...@gmail.com> wrote:
> Hi Robbie,
> While I am using -
> *forceSyncSend=true*
>  and with a CompletionListener, I would expect the callback event to come on
> the same thread that invoked the send(), however in my observation - I am
> sending the message using the main thread and I am getting a response on to
> the
> JmsSession [ID:12c9fd67-aed5-4930-8da2-31cb3f99729d:1:1] completion
> dispatcher
>  and that means any exception arising from the send() operation will not be
> propagated to the main thread.
>
> *Is this behavior correct?*
>
> Does that mean that the difference of behavior between Sync and Async is the
> callback of onCompletion() or onException() and nothing more...(as both are
> working on their own threads) and only the server side code would be able to
> understand the notification/exception but will not be able to propagate it
> to the client....!!!!
>
> Attached is the code -  TestQpidSendSync.java
> <http://qpid.2158936.n2.nabble.com/file/t396358/TestQpidSendSync.java>
>
> /Steps to reproduce/ -
> 1. Start the main
> 2. Terminate the connection to the service bus while the main thread is
> running
> 3. it will attempt couple of times before throwing exceptions in the
> CompletionListener's onException()
>
>
>
> --
> Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by akabhishek1 <ma...@gmail.com>.
Hi Robbie, 
While I am using - 
*forceSyncSend=true*
 and with a CompletionListener, I would expect the callback event to come on
the same thread that invoked the send(), however in my observation - I am
sending the message using the main thread and I am getting a response on to
the  
JmsSession [ID:12c9fd67-aed5-4930-8da2-31cb3f99729d:1:1] completion
dispatcher
 and that means any exception arising from the send() operation will not be
propagated to the main thread. 

*Is this behavior correct?*

Does that mean that the difference of behavior between Sync and Async is the
callback of onCompletion() or onException() and nothing more...(as both are
working on their own threads) and only the server side code would be able to
understand the notification/exception but will not be able to propagate it
to the client....!!!!

Attached is the code -  TestQpidSendSync.java
<http://qpid.2158936.n2.nabble.com/file/t396358/TestQpidSendSync.java>  

/Steps to reproduce/ - 
1. Start the main 
2. Terminate the connection to the service bus while the main thread is
running 
3. it will attempt couple of times before throwing exceptions in the
CompletionListener's onException()



--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by akabhishek1 <ma...@gmail.com>.
Hi Robbie,

Good Morning !! Thank you so much for your both reply. With your replies,
our doubts has been cleared and concluded for reliability. We have chosen
"forceSync + send(without listener) + failover(if required)".  

This will give us 1/3 performance on comparison with Async but beneficial
from reliable point of view.

Regards,
Abhishek Kumar



--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by Robbie Gemmell <ro...@gmail.com>.
I think you need to step back and establish the reliability guarantees
you need first. On the one hand you are looking for exceptions to be
indicated per send, and on the other you are forcing async sends via
the synchronous send method, which doesnt guarantee reliability at its
most basic level.

It feels more like you should be using synchronous sends, or if you
need additional perf and cant use multiple producers to get it, then
batching asynchronous sends with the completion listener and actually
monitoring the results.

On 8 June 2018 at 12:24, akabhishek1 <ma...@gmail.com> wrote:
> Hi Robbie,
>
> Thanks a lot for your reply and clarification. You are right for threading
> issue on "onException()". I am still doing R&D with your suggestion, i will
> come back to you shortly with results.
>
> Today morning at 10:55AM(UTC+1), i have taken master code and installed
> locally "0.33.0-SNAPSHOT" version.
>
> While doing research, i found one issue related with message loss and not
> exposing error to application. Which i think, you need to take a look before
> releasing of "0.33.0" version.
>
> Issue - Message are getting loss while connection drop and errors are not
> bubbling up at application level for failover connection.
>
> Steps to reproduce issue(Note - sending message asynchronously without
> CompletionListener with failover connection)
> 1. Change valid QueueName, SBUS_NAME, USERNAME and PASSWORD in below
> example.
> 2. Run java application
> 3. Every 5s("due to Thread.sleep(5000)") one message will publish at
> ServiceBus.
> 4. Disconnect Lan cable/internet after publishing 1 or 2 message. So there
> is no connection between ServiceBus from your desktop/Laptop.
> 5. Wait for 3-4 minute, you can see that, “for loop” continue sending for
> next 4 message.
>    After sending 4 message, for loop will be in suspended/wait status
> (thread will be in suspended/wait status). So no next processing is going
> on.
>
> 6. Wait for 5 minute
>
> 7. After that connect cable/internet. So After connecting to internet, for
> loop will start sending message(thread resumed also)
> 8. You can wait for 1-2 minute until you see message "**** All message sent
> successfully ****"
> 9. At the end of processing, we are expecting 30 message should be available
> on Queue.
>    ISSUE - But we can see only 26 messages on queue with no exception at
> application level.
>            SO we are having loss of 4 messages. I waited upto 30m but still
> not received loss messages at Queue.
>
> NOTE: - I have also tested without connection failover. I am getting
> exception on "onException()" but having loss of 4 message. I think we can’t
> do anything here because we don’t have failover.
>
> Could you please take a look on this issue and let me know for any queries.
>
> //// CODE ////
> package org.test;
>
> import java.util.Hashtable;
>
> import javax.jms.CompletionListener;
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.ExceptionListener;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> import org.mule.transport.servicebus.exception.ServicebusSendException;
>
> public class TestQpidSend implements ExceptionListener{
>
>         private static final String QUEUE_NAME = "XXXXXXXX";
>         private static final String SBUS_NAME = "XXXXXXXX";
>         private static final String USERNAME = "XXXXXXXX";
>         private static final String PASSWORD = "XXXXXXXX";
>         private static final String QPID_CONNECTION_FACTORY_CLASS =
> "org.apache.qpid.jms.jndi.JmsInitialContextFactory";
>
>         SendCompletionHandler sendCompletionHandler = new SendCompletionHandler();
>
>         public static void main(String[] args) throws Exception {
>                 TestQpidSend test = new TestQpidSend();
>                 test.send();
>
>                 //Loop for not terminating the application
>                 int i = 0;
>                 while(i > 0){
>                         i++;
>                 }
>         }
>
>         /* Enable constructor, if you are running in any logger for server logs */
>
>          /*public TestQpidRcvr() throws NamingException, JMSException {
>                 System.err.println("***** I am in ****");
>                 startListning();
>         }*/
>
>         private void send() throws NamingException, JMSException,
> InterruptedException {
>                 Hashtable<String, String> hashtable = new Hashtable<>();
>                 hashtable.put("connectionfactory.SBCF", "failover:(amqps://"+ SBUS_NAME
> +".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true)?failover.reconnectDelay=2000&failover.maxReconnectAttempts=-1&failover.warnAfterReconnectAttempts=10&failover.startupMaxReconnectAttempts=3&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
>
>                 //hashtable.put("connectionfactory.SBCF", "amqps://"+ SBUS_NAME
> +".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
>
>                 hashtable.put(Context.INITIAL_CONTEXT_FACTORY,
> QPID_CONNECTION_FACTORY_CLASS);
>
>                 Context context = new InitialContext(hashtable);
>                 ConnectionFactory connectionFactory = (ConnectionFactory)
> context.lookup("SBCF");
>
>                 Connection connection = connectionFactory.createConnection(USERNAME,
> PASSWORD);
>                 connection.setExceptionListener(this); // Settted ExceptionListener
>                 connection.start();
>
>                 Session session = connection.createSession(false,
> Session.CLIENT_ACKNOWLEDGE);
>
>                 System.out.println("createSession :: " + session);
>
>                 Destination destination = session.createQueue(QUEUE_NAME);
>                 //System.out.println("**** Destination created ****");
>
>                 MessageProducer messageProducer = session.createProducer(destination);
>
>
>
>                 for(int i=0; i < 30; i++){
>
>                         TextMessage textMessage = session.createTextMessage("Hello");
>
>                         System.out.println("**** sending ****");
>
>                         messageProducer.send(textMessage);
>
>                         System.out.println("**** sent ****");
>
>                         Thread.sleep(5000);
>                 }
>
>                 System.out.println("**** All message sent successfully ****");
>         }
>
>
>         @Override
>         public void onException(JMSException exception) {
>                 System.err.println("*** onException :: connection exception message ::
> ***" + exception.getMessage());
>                 exception.printStackTrace();
>
>         }
>
> }
>
>
>
>
> --
> Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by akabhishek1 <ma...@gmail.com>.
Hi Robbie,

Thanks a lot for your reply and clarification. You are right for threading
issue on "onException()". I am still doing R&D with your suggestion, i will
come back to you shortly with results.

Today morning at 10:55AM(UTC+1), i have taken master code and installed
locally "0.33.0-SNAPSHOT" version. 

While doing research, i found one issue related with message loss and not
exposing error to application. Which i think, you need to take a look before
releasing of "0.33.0" version.

Issue - Message are getting loss while connection drop and errors are not
bubbling up at application level for failover connection.

Steps to reproduce issue(Note - sending message asynchronously without
CompletionListener with failover connection)
1. Change valid QueueName, SBUS_NAME, USERNAME and PASSWORD in below
example.
2. Run java application
3. Every 5s("due to Thread.sleep(5000)") one message will publish at
ServiceBus.
4. Disconnect Lan cable/internet after publishing 1 or 2 message. So there
is no connection between ServiceBus from your desktop/Laptop.
5. Wait for 3-4 minute, you can see that, “for loop” continue sending for
next 4 message.
   After sending 4 message, for loop will be in suspended/wait status
(thread will be in suspended/wait status). So no next processing is going
on.

6. Wait for 5 minute

7. After that connect cable/internet. So After connecting to internet, for
loop will start sending message(thread resumed also)
8. You can wait for 1-2 minute until you see message "**** All message sent
successfully ****"
9. At the end of processing, we are expecting 30 message should be available
on Queue. 
   ISSUE - But we can see only 26 messages on queue with no exception at
application level. 
           SO we are having loss of 4 messages. I waited upto 30m but still
not received loss messages at Queue.
   
NOTE: - I have also tested without connection failover. I am getting
exception on "onException()" but having loss of 4 message. I think we can’t
do anything here because we don’t have failover.

Could you please take a look on this issue and let me know for any queries.

//// CODE ////
package org.test;

import java.util.Hashtable;

import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.mule.transport.servicebus.exception.ServicebusSendException;

public class TestQpidSend implements ExceptionListener{
	
	private static final String QUEUE_NAME = "XXXXXXXX";
	private static final String SBUS_NAME = "XXXXXXXX";
	private static final String USERNAME = "XXXXXXXX";
	private static final String PASSWORD = "XXXXXXXX";
	private static final String QPID_CONNECTION_FACTORY_CLASS =
"org.apache.qpid.jms.jndi.JmsInitialContextFactory";
	
	SendCompletionHandler sendCompletionHandler = new SendCompletionHandler();

	public static void main(String[] args) throws Exception {
		TestQpidSend test = new TestQpidSend();
		test.send();
		
		//Loop for not terminating the application
		int i = 0;
		while(i > 0){
			i++;
		}
	}
	
	/* Enable constructor, if you are running in any logger for server logs */
	 
	 /*public TestQpidRcvr() throws NamingException, JMSException {
		System.err.println("***** I am in ****");
		startListning();
	}*/

	private void send() throws NamingException, JMSException,
InterruptedException {
		Hashtable<String, String> hashtable = new Hashtable<>();
		hashtable.put("connectionfactory.SBCF", "failover:(amqps://"+ SBUS_NAME
+".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true)?failover.reconnectDelay=2000&failover.maxReconnectAttempts=-1&failover.warnAfterReconnectAttempts=10&failover.startupMaxReconnectAttempts=3&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
		
		//hashtable.put("connectionfactory.SBCF", "amqps://"+ SBUS_NAME
+".servicebus.windows.net?transport.tcpKeepAlive=true&amqp.traceFrames=true&jms.prefetchPolicy.all=1000&jms.forceAsyncSend=true");
		
		hashtable.put(Context.INITIAL_CONTEXT_FACTORY,
QPID_CONNECTION_FACTORY_CLASS);

		Context context = new InitialContext(hashtable);
		ConnectionFactory connectionFactory = (ConnectionFactory)
context.lookup("SBCF");
		
		Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
		connection.setExceptionListener(this); // Settted ExceptionListener
		connection.start();
		
		Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
		
		System.out.println("createSession :: " + session);
		
		Destination destination = session.createQueue(QUEUE_NAME);
		//System.out.println("**** Destination created ****");
		
		MessageProducer messageProducer = session.createProducer(destination);
		
		

		for(int i=0; i < 30; i++){
			
			TextMessage textMessage = session.createTextMessage("Hello");
			
			System.out.println("**** sending ****");
			
			messageProducer.send(textMessage);
			
			System.out.println("**** sent ****");
			
			Thread.sleep(5000);
		}
		
		System.out.println("**** All message sent successfully ****");
	}
	

	@Override
	public void onException(JMSException exception) {
		System.err.println("*** onException :: connection exception message ::
***" + exception.getMessage());
		exception.printStackTrace();
		
	}
	
}




--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by Robbie Gemmell <ro...@gmail.com>.
You cant throw exceptions from the listener because it is being run on
an internal client thread, not an application thread, so it makes no
sense for it to throw as noone would be around to catch it. If you
wanted your sending thread to see an exception from the callback, you
would need to have your CompletionListener coordinate with the sending
thead in order to pass it the result/exception details in some fashion
to throw.

I'm not sure why you are using asynchronous sends with a
CompletionListener. It doesnt seem like you actually want to send
asynchronously, or really know how to, and you want to throw
exceptions synchronously when the sends fail, which the regular
synchronous send method should do for you (*small print: assuming you
are sending persistent messages, which I think you are, and not using
transactions, which i think you aren't...or use the connection options
to force everything to sync send). I'd suggest you try things out
there again, using master. I'm going to start releasing 0.33.0
tomorrow most likely.

Robbie

On 7 June 2018 at 13:10, akabhishek1 <ma...@gmail.com> wrote:
> HI Robbie And Team,
>
> I need one more important suggestion from you. We are getting connection
> exception in "onException(Message message, Exception exception)" block while
> sending message to ServiceBus broker.
>
> The problem is -> "onException(Message message, Exception exception)" is
> overriden method of CompletionHandler. So we can't throw any checked
> exception.
>
> We have requirement to throw exception, so user can aware about payload and
> exception details.
>
> Could you please suggest , how can we throw custom exception from
> "onException(Message message, Exception exception)" block?
>
> Regards,
> Abhishek Kumar
>
>
>
> --
> Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: producer exception handling for 10m idle TimeOut -> "qpid-jms-client - 0.32.0"

Posted by akabhishek1 <ma...@gmail.com>.
HI Robbie And Team,

I need one more important suggestion from you. We are getting connection
exception in "onException(Message message, Exception exception)" block while
sending message to ServiceBus broker.

The problem is -> "onException(Message message, Exception exception)" is
overriden method of CompletionHandler. So we can't throw any checked
exception.

We have requirement to throw exception, so user can aware about payload and
exception details. 

Could you please suggest , how can we throw custom exception from
"onException(Message message, Exception exception)" block?

Regards,
Abhishek Kumar



--
Sent from: http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org