You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by ve...@apache.org on 2010/05/23 20:05:47 UTC

svn commit: r947453 - in /axis/axis2/java/transports/trunk/modules: base/src/main/java/org/apache/axis2/transport/base/ base/src/main/java/org/apache/axis2/transport/base/datagram/ udp/src/main/java/org/apache/axis2/transport/udp/

Author: veithen
Date: Sun May 23 18:05:46 2010
New Revision: 947453

URL: http://svn.apache.org/viewvc?rev=947453&view=rev
Log:
Applied Hiranya's patch for AXIS2-4722 (AXIS2-4722-update2.patch) without changes.

Modified:
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java
    axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
    axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
    axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java
    axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java

Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=947453&r1=947452&r2=947453&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java Sun May 23 18:05:46 2010
@@ -173,7 +173,7 @@ public abstract class AbstractTransportL
             state = BaseConstants.STARTED;
             // register to receive updates on services for lifetime management
             // cfgCtx.getAxisConfiguration().addObservers(axisObserver);
-            log.info(getTransportName().toUpperCase() + " Listener started");
+            log.info(getTransportName().toUpperCase() + " listener started");
             // iterate through deployed services and start
             serviceTracker.start();
         }

Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java?rev=947453&r1=947452&r2=947453&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java Sun May 23 18:05:46 2010
@@ -45,6 +45,8 @@ public abstract class AbstractTransportL
     /** A Map of service name to the protocol endpoints */
     private List<E> endpoints = new ArrayList<E>();
 
+    protected boolean useGlobalListener = false;
+
     @Override
     public void init(ConfigurationContext cfgCtx,
             TransportInDescription transportIn) throws AxisFault {
@@ -112,6 +114,8 @@ public abstract class AbstractTransportL
         if (endpoint.loadConfiguration(service)) {
             startEndpoint(endpoint);
             endpoints.add(endpoint);
+        } else if (useGlobalListener) {
+            return;
         } else {
             throw new AxisFault("Service doesn't have configuration information for transport " +
                     getTransportName());

Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=947453&r1=947452&r2=947453&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java (original)
+++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java Sun May 23 18:05:46 2010
@@ -38,6 +38,14 @@ public abstract class AbstractDatagramTr
             throws AxisFault {
         
         super.init(cfgCtx, transportIn);
+        initDispatcher();
+    }
+
+    private void initDispatcher() throws AxisFault {
+        if (dispatcher != null) {
+            return;
+        }
+
         DatagramDispatcherCallback callback = new DatagramDispatcherCallback() {
 
             public void receive(SocketAddress address,
@@ -47,11 +55,13 @@ public abstract class AbstractDatagramTr
                 workerPool.execute(new ProcessPacketTask(address, endpoint, data, length));
             }
         };
+
         try {
             dispatcher = createDispatcher(callback);
         } catch (IOException ex) {
             throw new AxisFault("Unable to create selector", ex);
         }
+        
         try {
             defaultIp = org.apache.axis2.util.Utils.getIpAddress(cfgCtx.getAxisConfiguration());
         } catch (SocketException ex) {
@@ -70,6 +80,8 @@ public abstract class AbstractDatagramTr
 
     @Override
     protected void startEndpoint(E endpoint) throws AxisFault {
+        initDispatcher();
+
         try {
             dispatcher.addEndpoint(endpoint);
         } catch (IOException ex) {
@@ -77,9 +89,10 @@ public abstract class AbstractDatagramTr
                     + endpoint.getEndpointReferences(defaultIp)[0], ex);
         }
         if (log.isDebugEnabled()) {
-            log.debug("Started listening on endpoint " + endpoint.getEndpointReferences(defaultIp)[0]
-                    + " [contentType=" + endpoint.getContentType()
-                    + "; service=" + endpoint.getServiceName() + "]");
+            log.debug("Started listening on endpoint " +
+                    endpoint.getEndpointReferences(defaultIp)[0] +
+                    " [contentType=" + endpoint.getContentType() +
+                    "; service=" + endpoint.getServiceName() + "]");
         }
     }
     
@@ -88,7 +101,8 @@ public abstract class AbstractDatagramTr
         try {
             dispatcher.removeEndpoint(endpoint);
         } catch (IOException ex) {
-            log.error("I/O exception while stopping listener for service " + endpoint.getServiceName(), ex);
+            log.error("I/O exception while stopping listener for service " +
+                    endpoint.getServiceName(), ex);
         }
     }
 

Modified: axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java?rev=947453&r1=947452&r2=947453&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java (original)
+++ axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/IODispatcher.java Sun May 23 18:05:46 2010
@@ -50,7 +50,7 @@ import org.apache.commons.logging.LogFac
  * packet received, a {@link ProcessPacketTask} instance is created
  * and dispatched to a worker thread from the configured pool.
  * <p>
- * The methods {@link #addEndpoint(Endpoint)}, {@link #removeEndpoint(String)}
+ * The methods {@link #addEndpoint(Endpoint)}, {@link #removeEndpoint(Endpoint)}
  * and {@link #stop()} are thread safe and may be called from any thread.
  * However, to avoid concurrency issues, the operation on the underlying
  * {@link Selector} will always be executed by the thread executing the
@@ -88,7 +88,8 @@ public class IODispatcher implements Dat
     
     private final DatagramDispatcherCallback callback;
     private final Selector selector;
-    private final Queue<SelectorOperation> selectorOperationQueue = new ConcurrentLinkedQueue<SelectorOperation>();
+    private final Queue<SelectorOperation> selectorOperationQueue =
+            new ConcurrentLinkedQueue<SelectorOperation>();
     
     /**
      * Constructor.
@@ -120,6 +121,7 @@ public class IODispatcher implements Dat
                 channel.register(selector, SelectionKey.OP_READ, endpoint);
             }
         });
+        log.info("UDP endpoint started on port : " + endpoint.getPort());
     }
     
     /**

Modified: axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java?rev=947453&r1=947452&r2=947453&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java (original)
+++ axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java Sun May 23 18:05:46 2010
@@ -39,11 +39,18 @@ import org.apache.axis2.transport.base.d
  *       than the specified length will be truncated.</dd>
  * </dl>
  * 
- * @see org.apache.synapse.transport.udp
+ * @see org.apache.axis2.transport.udp
  */
-public class UDPListener extends AbstractDatagramTransportListener<Endpoint> implements ManagementSupport {
+public class UDPListener extends AbstractDatagramTransportListener<Endpoint>
+        implements ManagementSupport {
+
+    public UDPListener() {
+        this.useGlobalListener = true;
+    }
+
     @Override
-    protected IODispatcher createDispatcher(DatagramDispatcherCallback callback) throws IOException {
+    protected IODispatcher createDispatcher(DatagramDispatcherCallback callback)
+            throws IOException {
     	IODispatcher dispatcher = new IODispatcher(callback);
     	new Thread(dispatcher, getTransportName() + "-dispatcher").start();
         // Start a new thread for the I/O dispatcher

Modified: axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java?rev=947453&r1=947452&r2=947453&view=diff
==============================================================================
--- axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java (original)
+++ axis/axis2/java/transports/trunk/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPSender.java Sun May 23 18:05:46 2010
@@ -24,8 +24,6 @@ import java.io.ByteArrayInputStream;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
-import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
 import java.nio.ByteBuffer;
 
 import org.apache.axiom.om.OMOutputFormat;
@@ -49,7 +47,7 @@ import javax.xml.stream.XMLStreamExcepti
 /**
  * Transport sender for the UDP protocol.
  * 
- * @see org.apache.synapse.transport.udp
+ * @see org.apache.axis2.transport.udp
  */
 public class UDPSender extends AbstractTransportSender {
     public UDPSender() {
@@ -57,15 +55,18 @@ public class UDPSender extends AbstractT
     }
     
     @Override
-    public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
+    public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut)
+            throws AxisFault {
         super.init(cfgCtx, transportOut);
     }
     
     @Override
-    public void sendMessage(MessageContext msgContext, String targetEPR, OutTransportInfo outTransportInfo) throws AxisFault {
+    public void sendMessage(MessageContext msgContext, String targetEPR,
+                            OutTransportInfo outTransportInfo) throws AxisFault {
         if ((targetEPR == null) && (outTransportInfo != null)) {
             // this can happen only at the server side and send the message using back chanel
-            DatagramOutTransportInfo datagramOutTransportInfo = (DatagramOutTransportInfo) outTransportInfo;
+            DatagramOutTransportInfo datagramOutTransportInfo =
+                    (DatagramOutTransportInfo) outTransportInfo;
             MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
             OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
             format.setContentType(datagramOutTransportInfo.getContentType());
@@ -74,14 +75,17 @@ public class UDPSender extends AbstractT
             ByteBuffer byteBuffer = ByteBuffer.allocate(payload.length);
             byteBuffer.put(payload);
 
-            DatagramSocket socket = null;
+            DatagramSocket socket;
             try {
                 socket = new DatagramSocket();
-                socket.send(new DatagramPacket(payload, payload.length, datagramOutTransportInfo.getSourceAddress()));
+                try {
+                    socket.send(new DatagramPacket(payload, payload.length,
+                            datagramOutTransportInfo.getSourceAddress()));
+                } finally {
+                    socket.close();
+                }
             } catch (IOException e) {
                 throw new AxisFault("Unable to send packet", e);
-            } finally {
-                socket.close();
             }
 
         } else {
@@ -93,8 +97,10 @@ public class UDPSender extends AbstractT
             try {
                 DatagramSocket socket = new DatagramSocket();
                 try {
-                    socket.send(new DatagramPacket(payload, payload.length, InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
-                    if (!msgContext.getOptions().isUseSeparateListener() && !msgContext.isServerSide()){
+                    socket.send(new DatagramPacket(payload, payload.length,
+                            InetAddress.getByName(udpOutInfo.getHost()), udpOutInfo.getPort()));
+                    if (!msgContext.getOptions().isUseSeparateListener() &&
+                            !msgContext.isServerSide()){
                         waitForReply(msgContext, socket, udpOutInfo.getContentType());
                     }
                 }
@@ -108,12 +114,13 @@ public class UDPSender extends AbstractT
         }
     }
 
-    private void waitForReply(MessageContext messageContext, DatagramSocket datagramSocket, String contentType) throws IOException {
+    private void waitForReply(MessageContext messageContext, DatagramSocket datagramSocket,
+                              String contentType) throws IOException {
 
         // piggy back message constant is used to pass a piggy back
         // message context in asnych model
         if (!(messageContext.getAxisOperation() instanceof OutInAxisOperation) &&
-                (messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null)) {
+                messageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null) {
             return;
         }