You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@servicemix.apache.org by estrnod <es...@annealsoft.com> on 2006/12/22 00:12:48 UTC

Endpoint Determination Made By Flow in a Clustered Environment?

We have an application which is using ServiceMix 3.0-incubating with Spring
2.0 and ActiveMQ 4.1.0-incubator.  We are also building with Jencks 1.3. (It
is running under ServiceMix, not a J2EE app.)  Each ServiceMix component has
as its destinationService a broker component who determines (by
content-based routing) which service to send a message to next in a flow
which will ultimately use several components.

Here is a snippet illustrating how the message is sent from the broker
component to the next dynamic destination:

                    InOnly messageExchange = getDeliveryChannel()
                            .createExchangeFactory().createInOnlyExchange();
                    NormalizedMessage out = messageExchange.createMessage();

                    out.setContent(new StringSource(req.toXML()));
                    out.setProperty(req.getClass().getName(), req);
                    messageExchange.setInMessage(out);
                    messageExchange.setService(new
QName("http://my.com/myApp", "myService",
                            "myPrefix"));

                    getDeliveryChannel().send(messageExchange);

At this level, It would appear to resolve only to a service, not a
particular endpoint. 

In deployment, I am attempting to run the app on two machines on the same
subnet with the same configuration (all services are available on each
machine) and start the process on one of the installations, and desire to
see messages being forwarded to other installations for processing, for
load-balancing.  

The machines being used in testing are different OS's and VM's, but all are
java 1.5.0.x.  Because a simple setup of the container using a unique name
and flowName="jms" running on different VM's caused the following exception,
I started investigating setting up an internal ActiveMQ broker so that it
could be configured to support sending messages in another way than
serialized.

2006-12-21 12:59:46,419 [ActiveMQ Session Task] ERROR -
JMSFlow$3.onMessage(291) | Error processing incoming broadcast message
javax.jms.JMSException: Failed to build body from bytes. Reason:
java.io.InvalidClassException: javax.xml.namespace.QName; local class
incompatible: stream classdesc serialVersionUID = -9120448754896609940,
local class serialVersionUID = 4418622981026545151

I finally ended up with the following setup (some of which may need to be
culled at this point) which has the homogenous nodes connecting and
negotiating a transport format, but all of the messages are still being
routed to components on the initiating node only.

Here is the current servicemix setup I have (gleaned from many sources over
the last couple of weeks and modified to use Spring):

=================================================

  <!-- the JBI container ("name" must be unique if running in a cluster) -->
  <sm:container 
  		name="${HOSTNAME}"
        id="jbi"
        createJmxConnector="false"
        MBeanServer="#mbeanServer"
        dumpStats="true"
        statsInterval="10"
        persistent="true" depends-on="localbroker">

  <sm:flow>
    <bean class="org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow">
      <property name="connectionFactory" ref="localFactory" />
    </bean>
  </sm:flow>   
     
    <sm:activationSpecs>

  	  <sm:activationSpec componentName="brokerHttpReceiver" 	
  						   service="myPrefix:brokerHttpBinding"
  						   endpoint="brokerHttpReceiver"
  						   destinationService="myPrefix:broker">
  		  <sm:component>
  		    <bean xmlns="http://xbean.org/schemas/spring/1.0"
  		          class="org.apache.servicemix.components.http.HttpConnector">
			<property name="host" value="localhost"/>
			<property name="port" value="8913"/>
			<property name="defaultInOut" value="false"/>
  		    </bean>
  		  </sm:component>
  	  </sm:activationSpec>

      <sm:activationSpec componentName="IPTransactionBroker"
service="myPrefix:broker">
        <sm:component>
          <bean class="com.my.components.broker.IPTransactionBroker">
            <property name="workManager" ref="workManager" />
            <property name="definitionsManager" ref="IPTXDefinitionsManager"
/>
            <property name="transactionManager" ref="ipTransactionManager"
/>
          </bean>
        </sm:component>
      </sm:activationSpec>
...

  <!-- Most other components with destinationService="myPrefix:broker" are
dynamically loaded -->

    </sm:activationSpecs>
  </sm:container>      

  <!-- the work manager (thread pool) for the entire JBI container -->
  <bean id="workManager" class="org.jencks.factory.WorkManagerFactoryBean">
    <property name="threadPoolSize" value="30" />
  </bean>
  
   <!-- additions for clustering -->
   <bean id="transactionManagerImpl"
class="org.jencks.factory.TransactionManagerFactoryBean">
      <property name="defaultTransactionTimeoutSeconds" value="600"/>

      <property name="transactionLog">
         <bean
class="org.apache.geronimo.transaction.log.UnrecoverableLog"/>
      </property> 
   </bean>

   <bean id="transactionContextManager"
class="org.jencks.factory.TransactionContextManagerFactoryBean">
      <property name="transactionManager" ref="transactionManagerImpl"/>
   </bean>

   <bean id="connectionTracker"
     
class="org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTrackingCoordinator"/>
  
   <bean id="connectionManager"
class="org.jencks.factory.ConnectionManagerFactoryBean">
      <property name="transactionContextManager"
ref="transactionContextManager"/>
      <property name="connectionTracker" ref="connectionTracker"/>
   </bean>

   <bean id="bootstrapContext"
class="org.jencks.factory.BootstrapContextFactoryBean">
      <property name="workManager" ref="workManager"/>
   </bean>
   
   <bean id="jmsResourceAdapter"
class="org.apache.activemq.ra.ActiveMQResourceAdapter" singleton="true">
      <property name="connectionFactory" ref="localFactory"/>
      <property name="serverUrl" value="peer://localBroker-${HOSTNAME}"/>
<!-- see
http://incubator.apache.org/activemq/resource-adapter-properties.html -->
   </bean> 
     
  <bean id="jencks" class="org.jencks.JCAContainer">
     <property name="bootstrapContext" ref="bootstrapContext"/>
     <property name="resourceAdapter" ref="jmsResourceAdapter"/>
   </bean>

<!-- JMX server and connector -->
  <bean id="mbeanServer"
class="org.springframework.jmx.support.MBeanServerFactoryBean"/>

  <bean id="registry" class="mx4j.tools.naming.NamingService"
init-method="start">
    <!-- tell each RMI registry to run on it's own port! -->
    <property name="port" value="1092"/>
  </bean>

  <bean id="serverConnector"
class="org.springframework.jmx.support.ConnectorServerFactoryBean"
depends-on="registry">
    <property name="objectName" value="connector:name=rmi"/>
    <property name="serviceUrl"
value="service:jmx:rmi:///jndi/rmi://localhost:1092/jmxrmi"/>
    <property name="threaded" value="true"/>
    <property name="daemon" value="true"/>
  </bean>

  <bean id="managementContext"
class="org.apache.activemq.broker.jmx.ManagementContext">
    <property name="MBeanServer" ref="mbeanServer"/>
  </bean>    	
      	
  <bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
    init-method="start">
      <property name="brokerName" value="localBroker-${HOSTNAME}"/>
      <property name="persistent" value="true"/>
      <property name="managementContext" ref="managementContext"/>
      <property name="useJmx" value="true"/>

 <!-- http://en.wikipedia.org/wiki/Time_to_live ; also 
     
http://incubator.apache.org/activemq/activemq-3-transport-configurations.html  
      (timeToLive (int) defaults to 1 so we don't send multicast messages
beyond our own local subnet.-->

<!--using distributed queues, having equivalent weighting to queue receivers
across the network, 
    but only when the receivers are active -
http://www.activemq.org/site/networks-of-brokers.html -->
    <property name="networkConnectors">
    	<list>
    		<bean class="org.apache.activemq.network.DiscoveryNetworkConnector">
    			<property name="uri" value="multicast://ipc"/>
			<!-- set TTL to 64 to attempt to get working also over VPN (unsuccessful)
-->
    			<property name="networkTTL" value="64"/>
         	    <property name="name" value="queues_only"/>
         		<property name="dynamicOnly" value="true"/>
                <property name="conduitSubscriptions" value="false"/>
                <property name="decreaseNetworkConsumerPriority"
value="false"/>
				<property name="excludedDestinations">
					<list>
						<bean class="org.apache.activemq.command.ActiveMQTopic">
							<property name="physicalName" value=">"/>
						</bean>
					</list>
      			</property>
            </bean>
        </list>
    </property>

      <property name="transportConnectors">
      	<list>
        	<bean class="org.apache.activemq.broker.TransportConnector">
        	    <property name="name" value="openwire"/>
        		<property name="uri" value="tcp://localhost:61616"/>
        		<property name="discoveryUri"
value="multicast://ipc?timeToLive=64"/>
        	</bean>
        	<bean class="org.apache.activemq.broker.TransportConnector">
        	    <property name="name" value="stomp"/>
        		<property name="uri" value="stomp://localhost:61613"/>
        	</bean>  
        	<!--
        	<bean class="org.apache.activemq.broker.TransportConnector">
        	    <property name="name" value="ssl"/>
        		<property name="uri" value="ssl://localhost:61617"/>
        	</bean>  
        	-->    	
        </list>
      </property>

	<!-- to facilitate testing... -->
	<!-- mostly from ActiveMQ example, which does send from producer on one
machine to consumer on another: -->
      <property name="destinationPolicy">
      	<bean class="org.apache.activemq.broker.region.policy.PolicyMap">
      		<property name="defaultEntry">
      			<bean class="org.apache.activemq.broker.region.policy.PolicyEntry">
      				<property name="dispatchPolicy">     				
      					<bean
class="org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy"/>
      				</property>	
      				<property name="destination">
      					<bean class="org.apache.activemq.command.ActiveMQQueue">
      						<constructor-arg value="org.apache.servicemix.jms.${HOSTNAME}"/>
      					</bean>
      				</property>
      				<property name="subscriptionRecoveryPolicy">
      					<bean
class="org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy"/>
      				</property>
      			</bean>
      		</property>
      	</bean>
      </property>
      
      <!-- to facilitate testing... -->
      <bean factory-method="getBroker" factory-bean="ipc_jbi">
  		<property name="defaultServiceChooser">
    		<bean class="org.apache.servicemix.jbi.resolver.RandomChoicePolicy" />
   		</property>
	  </bean> 
      <!-- -->
  </bean>

  <!-- JMS ConnectionFactory to use local broker -->
  <!-- use depends-on to prevent creating JMS connections before broker
starts; 
  	   see http://www.activemq.org/site/vm-transport-reference.html -->
  <bean id="localFactory" depends-on="localbroker"
    class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="peer://localBroker-${HOSTNAME}" />
    <property name="statsEnabled" value="true"/> 

    <!-- to facilitate testing... -->
    <property name="prefetchPolicy">
    	<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
    		<property name="queuePrefetch" value="1"/>
    		<property name="inputStreamPrefetch" value="1"/>
    		<property name="queueBrowserPrefetch" value="1"/>
    	</bean>
    </property>

  </bean> 
  
</beans>
=================================================

Some questions, then:

1)  Am I right to expect the transactions (messages) to be routed to other
installations of this configuration on the same subnet?

2)  Should I be able to see MBeans for all the queues for all started
installations when using jconsole to view
"service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"
(or "service:jmx:rmi:///jndi/rmi://localhost:1092/jmxrmi", setup this way to
prevent an error "ObjID already in use"), or just the local queues and then
connections for the other installations?  (Does JMX even matter for
ServiceMix routing to clustered nodes?)

3)  I found the following possibly very relevant posting on the "ServiceMix
- Dev" list:
http://www.nabble.com/-jira--Updated%3A-%28SM-344%29-Improve-clustering-by-allowing-implicit-endpoint-selection-to-be-done-by-the-flow-tf1358361s12049.html#a3638632
- was this ever addressed? (The JIRA posting's status is still "Open".)

4)  If the improvement in (3) is not done, what else should I be doing to
cause the behavior that I am seeking?

Thanks for any light that anyone can shed on this problem.

- Ellen



-- 
View this message in context: http://www.nabble.com/Endpoint-Determination-Made-By-Flow-in-a-Clustered-Environment--tf2868246s12049.html#a8016723
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: Endpoint Determination Made By Flow in a Clustered Environment?

Posted by apinke <ap...@gmail.com>.
Hi Ellen ,

I too am trying to set up load balancing between 2 SM instances on different
machines.
I am running into various issues where messages dont get sent to the other
machine ..

Your ServiceMix configuration was pretty helpful but I was wondering if you
could provide a more detaied explanation on how you managed to setup
clustering and load balancing !

I think a consolidated document on the wiki would be really helpful for all
the folks who are interested in setting up multiple SM instances for load
balancing.

thanks
Pat


-- 
View this message in context: http://www.nabble.com/Endpoint-Determination-Made-By-Flow-in-a-Clustered-Environment--tf2868246s12049.html#a8173226
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: Endpoint Determination Made By Flow in a Clustered Environment?

Posted by estrnod <es...@annealsoft.com>.
I have finally resolved several issues and am now seeing some messages sent
to and processed on the remote server!  As your said, a big problem ended up
being the different point releases of the JVM (1.5.0_06 vs. 1.5.0_10). 

Another problem seemed to be that I had multiple transport connectors
specified for the broker and was not consistently using the same one for
everything (e.g., using "peer://localBroker-${HOSTNAME}"  for the connection
factory.)  This seemed to cause more than one ActiveMQ instance to be
started up in the same JVM.  After I specified the JMS URL & broker URI
consistently as tcp://localhost:61616 and only saw one ActiveMQ being
started, I was then seeing the same message being processed on both servers,
but this problem vanished after I removed all transport connectors from my
broker except the one I was using with tcp (above.) 

Thank you so much for your help,
Ellen
-- 
View this message in context: http://www.nabble.com/Endpoint-Determination-Made-By-Flow-in-a-Clustered-Environment--tf2868246s12049.html#a8170506
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: Endpoint Determination Made By Flow in a Clustered Environment?

Posted by Guillaume Nodet <gn...@gmail.com>.
First the QName deserialization problem is due to incompatible versions
of the QName class, at least between JDK 1.4 and 5.  This may be
a bid problem in your case.

On 12/22/06, estrnod <es...@annealsoft.com> wrote:
>
>
> 1)  Am I right to expect the transactions (messages) to be routed to other
> installations of this configuration on the same subnet?

I would think they should.  You can first check that remote endpoints
are properly
discovered by increasing the log level of the jms flow to debug.
You should see some messages:
   [container]: adding remote endpoint: [endpoint]

>
> 2)  Should I be able to see MBeans for all the queues for all started
> installations when using jconsole to view
> "service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"
> (or "service:jmx:rmi:///jndi/rmi://localhost:1092/jmxrmi", setup this way to
> prevent an error "ObjID already in use"), or just the local queues and then
> connections for the other installations?  (Does JMX even matter for
> ServiceMix routing to clustered nodes?)

You should only see the queues for remote endpoints
once the flow has send a message to it.

>
> 3)  I found the following possibly very relevant posting on the "ServiceMix
> - Dev" list:
> http://www.nabble.com/-jira--Updated%3A-%28SM-344%29-Improve-clustering-by-allowing-implicit-endpoint-selection-to-be-done-by-the-flow-tf1358361s12049.html#a3638632
> - was this ever addressed? (The JIRA posting's status is still "Open".)

So I assume the problem is that you have the same service defined on several
instances of servicemix, but messages are always routed to the local
one, right ?
There are two different cases here:
  * is the same JBI endpoint is activated on a remote servicemix instance,
    they will be considered a single endpoint wrt to implicit endpoint selection
  * if you have different endoints names for a single service, be it
local or remote,
    they will be considered as different endpoints.
So now, before the exchange is given to the flow, a target endpoint is selected.
The default policy is to choose the first endpoint found for the
selected service.
The exchange will  be sent to a queue which name is the service/endpoint name.
If your endpoints are clustered (remote endpoints have the same name),
then load balancing should occur.  Note that it may appear that ActiveMQ is
optimized and will prefer routing to a local consumer instead of a remote one
(i don't remember how to customize this behavior).
If the endpoints have different names, you may want to change the policy
used to choose an endpoint to a random policy to avoid always choosing
the same endpoint (which may be local).

This JIRA is about doing load balancing before the endpoint is selected.
This has not been implemented yet, but you would still have the same kind
of problems (though the different behaviors between same endpoint names
vs different endpoint names would not happen).

>
> 4)  If the improvement in (3) is not done, what else should I be doing to
> cause the behavior that I am seeking?
>
> Thanks for any light that anyone can shed on this problem.
>
> - Ellen
>
>
>
> --
> View this message in context: http://www.nabble.com/Endpoint-Determination-Made-By-Flow-in-a-Clustered-Environment--tf2868246s12049.html#a8016723
> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>
>


-- 
Cheers,
Guillaume Nodet