You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Jean-Yves LEBLEU <jl...@gmail.com> on 2010/02/16 16:38:07 UTC

Consumer and Failover

Hi all,

I am trying to use the failover transport
(failover:(tcp://localhost:61618)), and I have some questions :

I did a test with a simple consumer in scala (see the code and log at
the end of the mail).

Scenario :
Broker is stopped.
We start the consumer, it waits
Start the broker
Consumer connects and consume messages
Stop the broker and start the broker again
Consumer tries 6 times to reconnect and stop working.

Any ideas ? thanks for your help.

Regards.
Jean-Yves


Here is the log :

R |16 févr. 2010 16:34:57,730 - Waiting 320 ms before attempting connection.
R |16 févr. 2010 16:34:58,042 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:34:58,042 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:34:59,245 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
R |16 févr. 2010 16:34:59,245 - Stopping transport tcp://null:0
R |16 févr. 2010 16:34:59,245 - Waiting 640 ms before attempting connection.
R |16 févr. 2010 16:34:59,886 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:34:59,886 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:00,964 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
R |16 févr. 2010 16:35:00,964 - Stopping transport tcp://null:0
R |16 févr. 2010 16:35:00,964 - Waiting 1280 ms before attempting connection.
R |16 févr. 2010 16:35:02,245 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:35:02,245 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:02,245 - Sending: WireFormatInfo { version=5,
properties={CacheSize=1024, CacheEnabled=true,
SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000,
TcpNoDelayEnabled=true, MaxInactivityDuration=30000,
TightEncodingEnabled=true, StackTraceEnabled=true},
magic=[A,c,t,i,v,e,M,Q]}
R |16 févr. 2010 16:35:02,448 - Received WireFormat: WireFormatInfo {
version=3, properties={CacheSize=1024, CacheEnabled=true,
SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000,
TcpNoDelayEnabled=true, MaxInactivityDuration=30000,
TightEncodingEnabled=true, StackTraceEnabled=true},
magic=[A,c,t,i,v,e,M,Q]}
R |16 févr. 2010 16:35:02,448 - tcp://localhost/127.0.0.1:61618 before
negotiation: OpenWireFormat{version=5, cacheEnabled=false,
stackTraceEnabled=false, tightEncodingEnabled=false,
sizePrefixDisabled=false}
R |16 févr. 2010 16:35:02,495 - tcp://localhost/127.0.0.1:61618 after
negotiation: OpenWireFormat{version=3, cacheEnabled=true,
stackTraceEnabled=true, tightEncodingEnabled=true,
sizePrefixDisabled=false}
R |16 févr. 2010 16:35:02,511 - Connection established
R |16 févr. 2010 16:35:02,511 - Successfully connected to tcp://localhost:61618
Message recieved:
{'RealTimeData':{'reference':'ZInternal_SystemTimeMillis','value':'1266334506073'}}
Message recieved:
{'RealTimeData':{'reference':'ZInternal_SystemTimeMillis','value':'1266334512073'}}
R |16 févr. 2010 16:35:14,339 - Stopping transport
tcp://localhost/127.0.0.1:61618
R |16 févr. 2010 16:35:14,339 - Transport failed to
tcp://localhost:61618 , attempting to automatically reconnect due to:
java.io.EOFException
R |16 févr. 2010 16:35:14,339 - Transport failed with the following exception:
java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:375)
	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
	at java.lang.Thread.run(Thread.java:619)
R |16 févr. 2010 16:35:14,355 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:35:14,355 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:15,339 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
R |16 févr. 2010 16:35:15,339 - Stopping transport tcp://null:0
R |16 févr. 2010 16:35:15,339 - Waiting 10 ms before attempting connection.
R |16 févr. 2010 16:35:15,355 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:35:15,355 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:16,355 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
.... tries 6 times and stops


-------------- The code ------------------------

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory

class Consumer(brokerUrl:String) extends MessageListener {
    var factory = new ActiveMQConnectionFactory(brokerUrl)
    val connection = factory.createConnection()
    connection.start()
    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)

    val destination = session.createTopic("wbm.supervision")
    val consumer = session.createConsumer(destination)
    consumer.setMessageListener(this)

    def onMessage(message:Message):Unit = {
        if (message.isInstanceOf[TextMessage]) {
            val textMessage = message.asInstanceOf[TextMessage]
            println("Message recieved: " + textMessage.getText())
        } else {
            println("Oops, not a text message")
        }
    }
}

object ConsumerApp extends Application {
  val consumer = new Consumer("failover:(tcp://localhost:61618)")
}

Re: Consumer and Failover

Posted by Gary Tully <ga...@gmail.com>.
sure. but if you can produce a simple pure java test case it would be great.

On 17 February 2010 08:54, Jean-Yves LEBLEU <jl...@gmail.com> wrote:

> Hi all,
>
> Should I open an issue in Jira ?
> Rgds.
> JY
>
> On Tue, Feb 16, 2010 at 5:25 PM, Jean-Yves LEBLEU <jl...@gmail.com>
> wrote:
> > We are using 5.3.0, but have the same problem in 5.2.0
> >
> >
> > On Tue, Feb 16, 2010 at 5:20 PM, James Strachan
> > <ja...@gmail.com> wrote:
> >> Which version are you using BTW?
> >>
> >> On 16 February 2010 16:17, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
> >>> On Tue, Feb 16, 2010 at 4:54 PM, James Strachan
> >>> <ja...@gmail.com> wrote:
> >>>> On 16 February 2010 15:38, Jean-Yves LEBLEU <jl...@gmail.com>
> wrote:
> >>>>> Hi all,
> >>>>>
> >>>>> I am trying to use the failover transport
> >>>>> (failover:(tcp://localhost:61618)), and I have some questions :
> >>>>>
> >>>>> I did a test with a simple consumer in scala (see the code and log at
> >>>>> the end of the mail).
> >>>>>
> >>>>> Scenario :
> >>>>> Broker is stopped.
> >>>>> We start the consumer, it waits
> >>>>> Start the broker
> >>>>> Consumer connects and consume messages
> >>>>> Stop the broker and start the broker again
> >>>>> Consumer tries 6 times to reconnect and stop working.
> >>>>
> >>>> I wonder if you have not restarted the broker in time for the client
> >>>> to reconnect? How long does the client take to reconnect and how long
> >>>> is the broker down for?
> >>>>
> >>>> Maybe you could try increasing the amount of time the failover
> >>>> transport waits before failing to connect...
> >>>>
> >>>> http://activemq.apache.org/failover-transport-reference.html
> >>>>
> >>>> e.g. try this URL
> >>>>
> >>>> failover:(tcp://localhost:61618)?maxReconnectAttempts=1000
> >>>>
> >>>>
> >>>> BTW slightly more idiomatic Scala code for onMessage would be...
> >>>>
> >>>> def onMessage(message: Message): Unit = message match {
> >>>>  case textMessage: TextMessage =>  println("Message recieved: " +
> >>>> textMessage.getText())
> >>>>  case _ => println("Oops, not a text message")
> >>>> }
> >>>> --
> >>>> James
> >>>> -------
> >>>> http://macstrac.blogspot.com/
> >>>>
> >>>> Open Source Integration
> >>>> http://fusesource.com/
> >>>>
> >>>
> >>> James,
> >>>
> >>> Thanks for the more idiomatic Scala code, we are starting to use scala
> >>> instead of java and have not explored all the scala subtelties :).
> >>>
> >>> I tried with maxReconnectAttempts=1000, the consumer stops after 7
> >>> attempts to reconnect to the broker.
> >>>
> >>> Regards.
> >>> Jean-Yves
> >>>
> >>
> >>
> >>
> >> --
> >> James
> >> -------
> >> http://macstrac.blogspot.com/
> >>
> >> Open Source Integration
> >> http://fusesource.com/
> >>
> >
>



-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Re: Consumer and Failover

Posted by Jean-Yves LEBLEU <jl...@gmail.com>.
Hi all,

Should I open an issue in Jira ?
Rgds.
JY

On Tue, Feb 16, 2010 at 5:25 PM, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
> We are using 5.3.0, but have the same problem in 5.2.0
>
>
> On Tue, Feb 16, 2010 at 5:20 PM, James Strachan
> <ja...@gmail.com> wrote:
>> Which version are you using BTW?
>>
>> On 16 February 2010 16:17, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
>>> On Tue, Feb 16, 2010 at 4:54 PM, James Strachan
>>> <ja...@gmail.com> wrote:
>>>> On 16 February 2010 15:38, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
>>>>> Hi all,
>>>>>
>>>>> I am trying to use the failover transport
>>>>> (failover:(tcp://localhost:61618)), and I have some questions :
>>>>>
>>>>> I did a test with a simple consumer in scala (see the code and log at
>>>>> the end of the mail).
>>>>>
>>>>> Scenario :
>>>>> Broker is stopped.
>>>>> We start the consumer, it waits
>>>>> Start the broker
>>>>> Consumer connects and consume messages
>>>>> Stop the broker and start the broker again
>>>>> Consumer tries 6 times to reconnect and stop working.
>>>>
>>>> I wonder if you have not restarted the broker in time for the client
>>>> to reconnect? How long does the client take to reconnect and how long
>>>> is the broker down for?
>>>>
>>>> Maybe you could try increasing the amount of time the failover
>>>> transport waits before failing to connect...
>>>>
>>>> http://activemq.apache.org/failover-transport-reference.html
>>>>
>>>> e.g. try this URL
>>>>
>>>> failover:(tcp://localhost:61618)?maxReconnectAttempts=1000
>>>>
>>>>
>>>> BTW slightly more idiomatic Scala code for onMessage would be...
>>>>
>>>> def onMessage(message: Message): Unit = message match {
>>>>  case textMessage: TextMessage =>  println("Message recieved: " +
>>>> textMessage.getText())
>>>>  case _ => println("Oops, not a text message")
>>>> }
>>>> --
>>>> James
>>>> -------
>>>> http://macstrac.blogspot.com/
>>>>
>>>> Open Source Integration
>>>> http://fusesource.com/
>>>>
>>>
>>> James,
>>>
>>> Thanks for the more idiomatic Scala code, we are starting to use scala
>>> instead of java and have not explored all the scala subtelties :).
>>>
>>> I tried with maxReconnectAttempts=1000, the consumer stops after 7
>>> attempts to reconnect to the broker.
>>>
>>> Regards.
>>> Jean-Yves
>>>
>>
>>
>>
>> --
>> James
>> -------
>> http://macstrac.blogspot.com/
>>
>> Open Source Integration
>> http://fusesource.com/
>>
>

Re: Consumer and Failover

Posted by Jean-Yves LEBLEU <jl...@gmail.com>.
We are using 5.3.0, but have the same problem in 5.2.0


On Tue, Feb 16, 2010 at 5:20 PM, James Strachan
<ja...@gmail.com> wrote:
> Which version are you using BTW?
>
> On 16 February 2010 16:17, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
>> On Tue, Feb 16, 2010 at 4:54 PM, James Strachan
>> <ja...@gmail.com> wrote:
>>> On 16 February 2010 15:38, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
>>>> Hi all,
>>>>
>>>> I am trying to use the failover transport
>>>> (failover:(tcp://localhost:61618)), and I have some questions :
>>>>
>>>> I did a test with a simple consumer in scala (see the code and log at
>>>> the end of the mail).
>>>>
>>>> Scenario :
>>>> Broker is stopped.
>>>> We start the consumer, it waits
>>>> Start the broker
>>>> Consumer connects and consume messages
>>>> Stop the broker and start the broker again
>>>> Consumer tries 6 times to reconnect and stop working.
>>>
>>> I wonder if you have not restarted the broker in time for the client
>>> to reconnect? How long does the client take to reconnect and how long
>>> is the broker down for?
>>>
>>> Maybe you could try increasing the amount of time the failover
>>> transport waits before failing to connect...
>>>
>>> http://activemq.apache.org/failover-transport-reference.html
>>>
>>> e.g. try this URL
>>>
>>> failover:(tcp://localhost:61618)?maxReconnectAttempts=1000
>>>
>>>
>>> BTW slightly more idiomatic Scala code for onMessage would be...
>>>
>>> def onMessage(message: Message): Unit = message match {
>>>  case textMessage: TextMessage =>  println("Message recieved: " +
>>> textMessage.getText())
>>>  case _ => println("Oops, not a text message")
>>> }
>>> --
>>> James
>>> -------
>>> http://macstrac.blogspot.com/
>>>
>>> Open Source Integration
>>> http://fusesource.com/
>>>
>>
>> James,
>>
>> Thanks for the more idiomatic Scala code, we are starting to use scala
>> instead of java and have not explored all the scala subtelties :).
>>
>> I tried with maxReconnectAttempts=1000, the consumer stops after 7
>> attempts to reconnect to the broker.
>>
>> Regards.
>> Jean-Yves
>>
>
>
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source Integration
> http://fusesource.com/
>

Re: Consumer and Failover

Posted by James Strachan <ja...@gmail.com>.
Which version are you using BTW?

On 16 February 2010 16:17, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
> On Tue, Feb 16, 2010 at 4:54 PM, James Strachan
> <ja...@gmail.com> wrote:
>> On 16 February 2010 15:38, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
>>> Hi all,
>>>
>>> I am trying to use the failover transport
>>> (failover:(tcp://localhost:61618)), and I have some questions :
>>>
>>> I did a test with a simple consumer in scala (see the code and log at
>>> the end of the mail).
>>>
>>> Scenario :
>>> Broker is stopped.
>>> We start the consumer, it waits
>>> Start the broker
>>> Consumer connects and consume messages
>>> Stop the broker and start the broker again
>>> Consumer tries 6 times to reconnect and stop working.
>>
>> I wonder if you have not restarted the broker in time for the client
>> to reconnect? How long does the client take to reconnect and how long
>> is the broker down for?
>>
>> Maybe you could try increasing the amount of time the failover
>> transport waits before failing to connect...
>>
>> http://activemq.apache.org/failover-transport-reference.html
>>
>> e.g. try this URL
>>
>> failover:(tcp://localhost:61618)?maxReconnectAttempts=1000
>>
>>
>> BTW slightly more idiomatic Scala code for onMessage would be...
>>
>> def onMessage(message: Message): Unit = message match {
>>  case textMessage: TextMessage =>  println("Message recieved: " +
>> textMessage.getText())
>>  case _ => println("Oops, not a text message")
>> }
>> --
>> James
>> -------
>> http://macstrac.blogspot.com/
>>
>> Open Source Integration
>> http://fusesource.com/
>>
>
> James,
>
> Thanks for the more idiomatic Scala code, we are starting to use scala
> instead of java and have not explored all the scala subtelties :).
>
> I tried with maxReconnectAttempts=1000, the consumer stops after 7
> attempts to reconnect to the broker.
>
> Regards.
> Jean-Yves
>



-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://fusesource.com/

Re: Consumer and Failover

Posted by Jean-Yves LEBLEU <jl...@gmail.com>.
On Tue, Feb 16, 2010 at 4:54 PM, James Strachan
<ja...@gmail.com> wrote:
> On 16 February 2010 15:38, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
>> Hi all,
>>
>> I am trying to use the failover transport
>> (failover:(tcp://localhost:61618)), and I have some questions :
>>
>> I did a test with a simple consumer in scala (see the code and log at
>> the end of the mail).
>>
>> Scenario :
>> Broker is stopped.
>> We start the consumer, it waits
>> Start the broker
>> Consumer connects and consume messages
>> Stop the broker and start the broker again
>> Consumer tries 6 times to reconnect and stop working.
>
> I wonder if you have not restarted the broker in time for the client
> to reconnect? How long does the client take to reconnect and how long
> is the broker down for?
>
> Maybe you could try increasing the amount of time the failover
> transport waits before failing to connect...
>
> http://activemq.apache.org/failover-transport-reference.html
>
> e.g. try this URL
>
> failover:(tcp://localhost:61618)?maxReconnectAttempts=1000
>
>
> BTW slightly more idiomatic Scala code for onMessage would be...
>
> def onMessage(message: Message): Unit = message match {
>  case textMessage: TextMessage =>  println("Message recieved: " +
> textMessage.getText())
>  case _ => println("Oops, not a text message")
> }
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source Integration
> http://fusesource.com/
>

James,

Thanks for the more idiomatic Scala code, we are starting to use scala
instead of java and have not explored all the scala subtelties :).

I tried with maxReconnectAttempts=1000, the consumer stops after 7
attempts to reconnect to the broker.

Regards.
Jean-Yves

Re: Consumer and Failover

Posted by James Strachan <ja...@gmail.com>.
On 16 February 2010 15:38, Jean-Yves LEBLEU <jl...@gmail.com> wrote:
> Hi all,
>
> I am trying to use the failover transport
> (failover:(tcp://localhost:61618)), and I have some questions :
>
> I did a test with a simple consumer in scala (see the code and log at
> the end of the mail).
>
> Scenario :
> Broker is stopped.
> We start the consumer, it waits
> Start the broker
> Consumer connects and consume messages
> Stop the broker and start the broker again
> Consumer tries 6 times to reconnect and stop working.

I wonder if you have not restarted the broker in time for the client
to reconnect? How long does the client take to reconnect and how long
is the broker down for?

Maybe you could try increasing the amount of time the failover
transport waits before failing to connect...

http://activemq.apache.org/failover-transport-reference.html

e.g. try this URL

failover:(tcp://localhost:61618)?maxReconnectAttempts=1000


BTW slightly more idiomatic Scala code for onMessage would be...

def onMessage(message: Message): Unit = message match {
  case textMessage: TextMessage =>  println("Message recieved: " +
textMessage.getText())
  case _ => println("Oops, not a text message")
}
-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://fusesource.com/