You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/02/27 18:40:59 UTC

svn commit: r748614 - in /cxf/branches/2.0.x-fixes: ./ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/resources/schemas/wsdl/

Author: dkulp
Date: Fri Feb 27 17:40:59 2009
New Revision: 748614

URL: http://svn.apache.org/viewvc?rev=748614&view=rev
Log:
Merged revisions 748608 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/branches/2.1.x-fixes

................
  r748608 | dkulp | 2009-02-27 12:23:32 -0500 (Fri, 27 Feb 2009) | 10 lines
  
  Merged revisions 748604 via svnmerge from 
  https://svn.apache.org/repos/asf/cxf/trunk
  
  ........
    r748604 | dkulp | 2009-02-27 12:17:33 -0500 (Fri, 27 Feb 2009) | 3 lines
    
    [CXF-2034] Add lifecycle listener to make sure the listeners are shutdown
    If the user specifies a CorrelationID and the destination cannot handle that, bring up a new temporary queue to handle it to avoid a "hang".
  ........
................

Modified:
    cxf/branches/2.0.x-fixes/   (props changed)
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 27 17:40:59 2009
@@ -1,3 +1,3 @@
-/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715,691745,692162-692163,692468,692500,694466-694469,694472,694717,694748-694749,694870,695503,695509,695553,695555,695563,695875-695877,695940,695980,696436,696455,696721,697086,698129,701526,701634,702275,702443,702527,702582,702604,702610,702642-702643,702649,702760,702870,702873,702959,703193,703242,703523,704303,704587,704738,704998,705153,705280-705449,705455,709357,709641,709644,710177,710184,711736,712199,712225,712275,712600,712896,713083,713410,713413,713594,713599,713808,713901,714169-714171,718622,718929,719211,719221-719223,7192
 96,719300-719301,719303,719308,719332,719356,719363,719369-719383,719650,719695,720124,723545,724403-724404,724421,724448,724451,724486-724487,724714,725367,725371,725763,725774,726045,726048,726106,726123,726745-726746,726749,726754,726756-726758,726995,727794,727797-727798,727800,731676,731684,731686-731688,731690,733587,733873,733876,733884,733891,733893,733915,735132,735136,735789,736451,736453,736456,736761,736765,736771,736827,736853,737126,737706,737714,737716,737726,737729-737730,737732,737734,737737,737861,738167,738181,738202,738206,738208,738243,738245,738588-738594,739379,739916,739920-739921,739930,740043,740190,740195,740326,740328,740959,741153-741156,741159,741163,741190,743496,743498-743499,743501-743502,743504,743512,743516,743968,744173,744181,745593,745596,745598,745600,745604-745605,745607,745639,745954,746013,746021-746022,747459,747461-747464,747466,747471,747477,747817,748248,748324,748605,748607
-/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355,691488,691602,691706,691728,692116,692157,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694869,695396,695484,695537,695552,695561,695619,695684,695835,695935,695977,696094,696433,696720,697085,698128,700261,700602,701783,701830,701862,702267,702580,702602,702609,702616,702656,702957,703191,703239,703501,704584,704997,705150,705274,705340,705446,708550,708554,709353-709354,709425,710150,712194,712198,712272,712312,712670,712893,713082,713584,713597,713804,713899,714167-714168,718281,718565,718620,718640,718665,719017,719210,719215-719218,719222,719273,719327-719354,719362,719368,719382,719649,719680,720119-720217,723338,723717-723791,724334-724371,724433-724438,724449,724481,724485,724668,724782,724795,725754,725773,725799,725839,726342,726524,726631,726637,726639,726692,726724,726992,727445,727692,727754,727792,730139,731598,731604,731615,73163
 1,731635,732320,732363,732411,732710,732773,732827,732829,733582,734666,734836,735734,736332,736343,736352,736408,736423,736491,736738,736766,736825,736852,737032,737069,737124,737237,737299,737356,737494,737498,737855,738166,738178,738201,738242,738244,739367,739799-739800,739876-739877,739922,739937,740154,740186,740197,740255,740810,740844,740886,740902,740923,741124,741152,741188,741521,741529,741879,743087,743439,743441,743459,743469,743965,743967,744175,744195,744756,745214,745243,745560,745578,745586,745638,745950,746012,746019-746020,746306,746357,746373-746374,747217,747386,747406,747454,747766,748223,748305,748313,748603
+/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715,691745,692162-692163,692468,692500,694466-694469,694472,694717,694748-694749,694870,695503,695509,695553,695555,695563,695875-695877,695940,695980,696436,696455,696721,697086,698129,701526,701634,702275,702443,702527,702582,702604,702610,702642-702643,702649,702760,702870,702873,702959,703193,703242,703523,704303,704587,704738,704998,705153,705280-705449,705455,709357,709641,709644,710177,710184,711736,712199,712225,712275,712600,712896,713083,713410,713413,713594,713599,713808,713901,714169-714171,718622,718929,719211,719221-719223,7192
 96,719300-719301,719303,719308,719332,719356,719363,719369-719383,719650,719695,720124,723545,724403-724404,724421,724448,724451,724486-724487,724714,725367,725371,725763,725774,726045,726048,726106,726123,726745-726746,726749,726754,726756-726758,726995,727794,727797-727798,727800,731676,731684,731686-731688,731690,733587,733873,733876,733884,733891,733893,733915,735132,735136,735789,736451,736453,736456,736761,736765,736771,736827,736853,737126,737706,737714,737716,737726,737729-737730,737732,737734,737737,737861,738167,738181,738202,738206,738208,738243,738245,738588-738594,739379,739916,739920-739921,739930,740043,740190,740195,740326,740328,740959,741153-741156,741159,741163,741190,743496,743498-743499,743501-743502,743504,743512,743516,743968,744173,744181,745593,745596,745598,745600,745604-745605,745607,745639,745954,746013,746021-746022,747459,747461-747464,747466,747471,747477,747817,748248,748324,748605,748607-748608
+/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355,691488,691602,691706,691728,692116,692157,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694869,695396,695484,695537,695552,695561,695619,695684,695835,695935,695977,696094,696433,696720,697085,698128,700261,700602,701783,701830,701862,702267,702580,702602,702609,702616,702656,702957,703191,703239,703501,704584,704997,705150,705274,705340,705446,708550,708554,709353-709354,709425,710150,712194,712198,712272,712312,712670,712893,713082,713584,713597,713804,713899,714167-714168,718281,718565,718620,718640,718665,719017,719210,719215-719218,719222,719273,719327-719354,719362,719368,719382,719649,719680,720119-720217,723338,723717-723791,724334-724371,724433-724438,724449,724481,724485,724668,724782,724795,725754,725773,725799,725839,726342,726524,726631,726637,726639,726692,726724,726992,727445,727692,727754,727792,730139,731598,731604,731615,73163
 1,731635,732320,732363,732411,732710,732773,732827,732829,733582,734666,734836,735734,736332,736343,736352,736408,736423,736491,736738,736766,736825,736852,737032,737069,737124,737237,737299,737356,737494,737498,737855,738166,738178,738201,738242,738244,739367,739799-739800,739876-739877,739922,739937,740154,740186,740197,740255,740810,740844,740886,740902,740923,741124,741152,741188,741521,741529,741879,743087,743439,743441,743459,743469,743965,743967,744175,744195,744756,745214,745243,745560,745578,745586,745638,745950,746012,746019-746020,746306,746357,746373-746374,747217,747386,747406,747454,747766,748223,748305,748313,748603-748604
 /incubator/cxf/trunk:434594-651668

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=748614&r1=748613&r2=748614&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Fri Feb 27 17:40:59 2009
@@ -24,6 +24,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.ref.WeakReference;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +36,9 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -54,6 +58,7 @@
  * Message. This is then provided in the Exchange and also sent to the incomingObserver
  */
 public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener {
+
     static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
     
     private static final String CORRELATED = JMSConduit.class.getName() + ".correlated";
@@ -62,8 +67,10 @@
     private JMSConfiguration jmsConfig;
     private Map<String, Exchange> correlationMap;
     private DefaultMessageListenerContainer jmsListener;
+    private DefaultMessageListenerContainer allListener;
     private String conduitId;
     private AtomicLong messageCount;
+    private JMSBusLifeCycleListener listener;
 
     public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration jmsConfig) {
         super(target);
@@ -108,12 +115,32 @@
             .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
 
         JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers);
-        if (!exchange.isOneWay() && jmsListener == null) {
-            jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getReplyDestination(), 
-                                                       conduitId);
+        String userCID = headers != null ? headers.getJMSCorrelationID() : null;
+        DefaultMessageListenerContainer jmsList = jmsListener;
+        if (!exchange.isOneWay()) {
+            if (userCID == null || !jmsConfig.isUseConduitIdSelector()) { 
+                if (jmsListener == null) {
+                    jmsListener = JMSFactory.createJmsListener(jmsConfig, this, 
+                                                               jmsConfig.getReplyDestination(), 
+                                                               conduitId, 
+                                                               false);
+                    addBusListener(exchange.get(Bus.class));
+                }
+                jmsList = jmsListener;
+            } else {
+                if (allListener == null) {
+                    allListener = JMSFactory.createJmsListener(jmsConfig, 
+                                                               this, 
+                                                               null, 
+                                                               null, 
+                                                               true);
+                    addBusListener(exchange.get(Bus.class));
+                }
+                jmsList = allListener;
+            }
         }
         
-        final javax.jms.Destination replyTo = exchange.isOneWay() ? null : jmsListener.getDestination();
+        final javax.jms.Destination replyTo = exchange.isOneWay() ? null : jmsList.getDestination();
 
         final String correlationId = (headers != null && headers.isSetJMSCorrelationID()) 
             ? headers.getJMSCorrelationID() 
@@ -154,8 +181,6 @@
                         throw new RuntimeException("Timeout receiving message with correlationId "
                                                    + correlationId);
                     }
-                    
-                    
                 }
             }
         } else {
@@ -163,6 +188,46 @@
         }
     }
 
+    static class JMSBusLifeCycleListener implements BusLifeCycleListener {
+        final WeakReference<JMSConduit> ref;
+        BusLifeCycleManager blcm;
+        JMSBusLifeCycleListener(JMSConduit c, BusLifeCycleManager b) {
+            ref = new WeakReference<JMSConduit>(c);
+            blcm = b;
+            blcm.registerLifeCycleListener(this);
+        }
+        
+        public void initComplete() {
+        }
+
+        public void postShutdown() {
+        }
+
+        public void preShutdown() {
+            unreg();
+            blcm = null;
+            JMSConduit c = ref.get();
+            if (c != null) {
+                c.listener = null;
+                c.close();
+            }
+        }
+        public void unreg() {
+            if (blcm != null) {
+                blcm.unregisterLifeCycleListener(this);
+            }
+        }
+    }
+    private synchronized void addBusListener(Bus bus) {
+        if (listener == null && bus != null) {
+            BusLifeCycleManager blcm = bus.getExtension(BusLifeCycleManager.class);
+            if (blcm != null) {
+                listener = new JMSBusLifeCycleListener(this,
+                                                       blcm);
+            }
+        }
+    }
+
     /**
      * When a message is received on the reply destination the correlation map is searched for the
      * correlationId. If it is found the message is converted to a CXF message and the thread sending the
@@ -208,9 +273,16 @@
     }
 
     public void close() {
+        if (listener != null) {
+            listener.unreg();
+            listener = null;
+        }
         if (jmsListener != null) {
             jmsListener.shutdown();
         }
+        if (allListener != null) {
+            allListener.shutdown();
+        }
         LOG.log(Level.FINE, "JMSConduit closed ");
     }
 
@@ -229,9 +301,16 @@
 
     @Override
     protected void finalize() throws Throwable {
+        if (listener != null) {
+            listener.unreg();
+            listener = null;
+        }
         if (jmsListener != null) {
             jmsListener.shutdown();
         }
+        if (allListener != null) {
+            allListener.shutdown();
+        }
         super.finalize();
     }
 }

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=748614&r1=748613&r2=748614&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Feb 27 17:40:59 2009
@@ -74,7 +74,7 @@
     private String replyDestination;
     private String messageType = JMSConstants.TEXT_MESSAGE_TYPE;
     private boolean pubSubDomain;
-    private boolean useConduitIdSelector = true;
+    private Boolean useConduitIdSelector;
     private String conduitSelectorPrefix = "";
     private boolean autoResolveDestination;
     private long recoveryInterval = DEFAULT_VALUE;
@@ -332,9 +332,15 @@
     }
 
     public boolean isUseConduitIdSelector() {
+        if (useConduitIdSelector == null) {
+            return true;
+        }
         return useConduitIdSelector;
     }
-
+    public boolean isSetUseConduitIdSelector() {
+        return useConduitIdSelector != null;
+    }
+    
     public int getMaxConcurrentTasks() {
         return maxConcurrentTasks;
     }

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=748614&r1=748613&r2=748614&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Feb 27 17:40:59 2009
@@ -102,7 +102,8 @@
         org.apache.cxf.common.i18n.Message msg = 
             new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name);
         jmsConfig.ensureProperlyConfigured(msg);
-        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getTargetDestination(), null);
+        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, 
+                                                   jmsConfig.getTargetDestination(), null, true);
     }
 
     public void deactivate() {
@@ -196,6 +197,10 @@
     }
 
     public void sendExchange(Exchange exchange, final Object replyObj) {
+        if (exchange.isOneWay()) {
+            //Don't need to send anything
+            return;
+        }
         Message inMessage = exchange.getInMessage();
         final Message outMessage = exchange.getOutMessage();
         if (jmsConfig.isPubSubDomain()) {

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=748614&r1=748613&r2=748614&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Feb 27 17:40:59 2009
@@ -117,7 +117,8 @@
     public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
                                                                     MessageListener listenerHandler,
                                                                     String destinationName, 
-                                                                    String messageSelectorPrefix) {
+                                                                    String messageSelectorPrefix,
+                                                                    boolean userCID) {
         DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
             ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
@@ -141,13 +142,15 @@
             jmsListener.setCacheLevel(jmsConfig.getCacheLevel());
         }
         String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
-        if (messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) {
-            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
+        if (!userCID || jmsConfig.isSetUseConduitIdSelector()) {
+            if (messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) {
+                jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
                                             + staticSelectorPrefix 
                                             + messageSelectorPrefix + "%'");
-        } else if (staticSelectorPrefix.length() > 0) {
-            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
+            } else if (staticSelectorPrefix.length() > 0) {
+                jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
                                             + staticSelectorPrefix +  "%'");
+            }
         }
         if (jmsConfig.getDestinationResolver() != null) {
             jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=748614&r1=748613&r2=748614&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Fri Feb 27 17:40:59 2009
@@ -96,7 +96,9 @@
             //if (clientConfig.isSetClientReceiveTimeout()) {
             jmsConfig.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
             //}
-            jmsConfig.setUseConduitIdSelector(clientConfig.isUseConduitIdSelector());
+            if (clientConfig.isSetUseConduitIdSelector()) {
+                jmsConfig.setUseConduitIdSelector(clientConfig.isUseConduitIdSelector());
+            }
             if (clientConfig.isSetConduitSelectorPrefix()) {
                 jmsConfig.setConduitSelectorPrefix(clientConfig.getConduitSelectorPrefix());
             }

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd?rev=748614&r1=748613&r2=748614&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd Fri Feb 27 17:40:59 2009
@@ -55,7 +55,7 @@
     			<xs:attribute name="messageTimeToLive" type="xs:long"
     				default="0" />
                 <xs:attribute name="conduitSelectorPrefix" type="xs:string" use="optional" default=""/>    				
-                <xs:attribute name="useConduitIdSelector" type="xs:boolean" use="optional" default="true"/>
+                <xs:attribute name="useConduitIdSelector" type="xs:boolean" use="optional"/>
     		</xs:extension>
     	</xs:complexContent>
     </xs:complexType>