You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/08/07 18:18:07 UTC

svn commit: r429380 - in /incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip: patterns/ContentBasedRouter.java support/AbstractContentBasedRouter.java

Author: gnodet
Date: Mon Aug  7 09:18:06 2006
New Revision: 429380

URL: http://svn.apache.org/viewvc?rev=429380&view=rev
Log:
Refactor the ContentBasedRouter.java with an abstract class, so that it is easier to extend.

Added:
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractContentBasedRouter.java
Modified:
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java?rev=429380&r1=429379&r2=429380&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java (original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/ContentBasedRouter.java Mon Aug  7 09:18:06 2006
@@ -23,12 +23,10 @@
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.AbstractContentBasedRouter;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.eip.support.MessageUtil;
 import org.apache.servicemix.eip.support.RoutingRule;
-import org.apache.servicemix.store.Store;
 
 /**
  * ContentBasedRouter can be used for all kind of content-based routing.
@@ -41,16 +39,12 @@
  * @org.apache.xbean.XBean element="content-based-router"
  *                  description="A Content-Based Router"
  */
-public class ContentBasedRouter extends EIPEndpoint {
+public class ContentBasedRouter extends AbstractContentBasedRouter {
 
     /**
      * Routing rules that are evaluated to find the target destination
      */
     private RoutingRule[] rules;
-    /**
-     * The correlation property used by this component
-     */
-    private String correlation;
     
     /**
      * @return Returns the rules.
@@ -75,8 +69,6 @@
         if (rules == null || rules.length == 0) {
             throw new IllegalArgumentException("rules should contain at least one RoutingRule");
         }
-        // Create correlation property
-        correlation = "AbstractContentBasedRouter.Correlation." + getService() + "." + getEndpoint();
     }
 
     /* (non-Javadoc)
@@ -116,65 +108,6 @@
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
-     */
-    protected void processAsync(MessageExchange exchange) throws Exception {
-        if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
-            exchange.getProperty(correlation) == null) {
-            // Create exchange for target
-            MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
-            if (store.hasFeature(Store.CLUSTERED)) {
-                exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
-                tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
-            }
-            // Set correlations
-            tme.setProperty(correlation, exchange.getExchangeId());
-            exchange.setProperty(correlation, tme.getExchangeId());
-            // Put exchange to store
-            store.store(exchange.getExchangeId(), exchange);
-            // Now copy input to new exchange
-            // We need to read the message once for finding routing target
-            // so ensure we have a re-readable source
-            NormalizedMessage in = MessageUtil.copyIn(exchange);
-            MessageUtil.transferToIn(in, tme); 
-            // Retrieve target
-            ExchangeTarget target = getDestination(tme);
-            target.configureTarget(tme, getContext());
-            // Send in to target
-            send(tme);
-        // Mimic the exchange on the other side and send to needed listener
-        } else {
-            String id = (String) exchange.getProperty(correlation);
-            if (id == null) {
-                throw new IllegalStateException(correlation + " property not found");
-            }
-            MessageExchange org = (MessageExchange) store.load(id);
-            if (org == null) {
-                throw new IllegalStateException("Could not load original exchange with id " + id);
-            }
-            // Reproduce DONE status to the other side
-            if (exchange.getStatus() == ExchangeStatus.DONE) {
-                done(org);
-            // Reproduce ERROR status to the other side
-            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                fail(org, exchange.getError());
-            // Reproduce faults to the other side and listeners
-            } else if (exchange.getFault() != null) {
-                store.store(exchange.getExchangeId(), exchange);
-                MessageUtil.transferTo(exchange, org, "fault"); 
-                send(org);
-            // Reproduce answers to the other side
-            } else if (exchange.getMessage("out") != null) {
-                store.store(exchange.getExchangeId(), exchange);
-                MessageUtil.transferTo(exchange, org, "out"); 
-                send(org);
-            } else {
-                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
-            }
-        }
-    }
-    
     /**
      * Find the target destination for the given JBI exchange
      * @param exchange

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractContentBasedRouter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractContentBasedRouter.java?rev=429380&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractContentBasedRouter.java (added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractContentBasedRouter.java Mon Aug  7 09:18:06 2006
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.store.Store;
+
+/**
+ * AbstractContentBasedRouter can be used as a base class for content-based routing.
+ * This component implements the  
+ * <a href="http://www.enterpriseintegrationpatterns.com/ContentBasedRouter.html">Content-Based Router</a> 
+ * pattern.
+ * 
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public abstract class AbstractContentBasedRouter extends EIPEndpoint {
+
+    /**
+     * The correlation property used by this component
+     */
+    private String correlation;
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#validate()
+     */
+    public void validate() throws DeploymentException {
+        super.validate();
+        // Create correlation property
+        correlation = "AbstractContentBasedRouter.Correlation." + getService() + "." + getEndpoint();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processSync(MessageExchange exchange) throws Exception {
+        // Create exchange for target
+        MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+        // Now copy input to new exchange
+        // We need to read the message once for finding routing target
+        // so ensure we have a re-readable source
+        NormalizedMessage in = MessageUtil.copyIn(exchange);
+        MessageUtil.transferToIn(in, tme); 
+        // Retrieve target
+        ExchangeTarget target = getDestination(tme);
+        target.configureTarget(tme, getContext());
+        // Send in to target
+        sendSync(tme);
+        // Send back the result
+        if (tme.getStatus() == ExchangeStatus.DONE) {
+            done(exchange);
+        } else if (tme.getStatus() == ExchangeStatus.ERROR) {
+            fail(exchange, tme.getError());
+        } else if (tme.getFault() != null) {
+            Fault fault = MessageUtil.copyFault(tme);
+            done(tme);
+            MessageUtil.transferToFault(fault, exchange);
+            sendSync(exchange);
+        } else if (tme.getMessage("out") != null) {
+            NormalizedMessage out = MessageUtil.copyOut(tme);
+            done(tme);
+            MessageUtil.transferToOut(out, exchange);
+            sendSync(exchange);
+        } else {
+            done(tme);
+            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+     */
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        if (exchange.getRole() == MessageExchange.Role.PROVIDER &&
+            exchange.getProperty(correlation) == null) {
+            // Create exchange for target
+            MessageExchange tme = exchangeFactory.createExchange(exchange.getPattern());
+            if (store.hasFeature(Store.CLUSTERED)) {
+                exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
+                tme.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
+            }
+            // Set correlations
+            tme.setProperty(correlation, exchange.getExchangeId());
+            exchange.setProperty(correlation, tme.getExchangeId());
+            // Put exchange to store
+            store.store(exchange.getExchangeId(), exchange);
+            // Now copy input to new exchange
+            // We need to read the message once for finding routing target
+            // so ensure we have a re-readable source
+            NormalizedMessage in = MessageUtil.copyIn(exchange);
+            MessageUtil.transferToIn(in, tme); 
+            // Retrieve target
+            ExchangeTarget target = getDestination(tme);
+            target.configureTarget(tme, getContext());
+            // Send in to target
+            send(tme);
+        // Mimic the exchange on the other side and send to needed listener
+        } else {
+            String id = (String) exchange.getProperty(correlation);
+            if (id == null) {
+                throw new IllegalStateException(correlation + " property not found");
+            }
+            MessageExchange org = (MessageExchange) store.load(id);
+            if (org == null) {
+                throw new IllegalStateException("Could not load original exchange with id " + id);
+            }
+            // Reproduce DONE status to the other side
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                done(org);
+            // Reproduce ERROR status to the other side
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                fail(org, exchange.getError());
+            // Reproduce faults to the other side and listeners
+            } else if (exchange.getFault() != null) {
+                store.store(exchange.getExchangeId(), exchange);
+                MessageUtil.transferTo(exchange, org, "fault"); 
+                send(org);
+            // Reproduce answers to the other side
+            } else if (exchange.getMessage("out") != null) {
+                store.store(exchange.getExchangeId(), exchange);
+                MessageUtil.transferTo(exchange, org, "out"); 
+                send(org);
+            } else {
+                throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE + " but has no Out nor Fault message");
+            }
+        }
+    }
+    
+    /**
+     * Find the target destination for the given JBI exchange
+     * @param exchange
+     * @return the target for the given exchange
+     * @throws Exception
+     */
+    protected abstract ExchangeTarget getDestination(MessageExchange exchange) throws Exception;
+
+}