You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by ch...@apache.org on 2003/10/21 16:24:40 UTC

cvs commit: incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio NonBlockingChannel.java NonBlockingServer.java

chirino     2003/10/21 07:24:40

  Modified:    modules/core/src/java/org/apache/geronimo/remoting/transport/async
                        AbstractServer.java Channel.java ChannelPool.java
               modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio
                        BlockingChannel.java BlockingServer.java
               modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio
                        NonBlockingChannel.java NonBlockingServer.java
  Log:
  Fix so that the async transport does not hang when a server establishes a backchannel to the client.
  
  Revision  Changes    Path
  1.3       +2 -1      incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/AbstractServer.java
  
  Index: AbstractServer.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/AbstractServer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- AbstractServer.java	29 Aug 2003 19:16:53 -0000	1.2
  +++ AbstractServer.java	21 Oct 2003 14:24:39 -0000	1.3
  @@ -81,6 +81,7 @@
           Long.parseLong(System.getProperty("org.apache.geronimo.remoting.transport.async.connection_timeout", "300000"));
       // 5 min.
   
  +    private HashMap uriTo= new HashMap();
       private HashMap channelPools = new HashMap();
   
       /**
  
  
  
  1.2       +2 -2      incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/Channel.java
  
  Index: Channel.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/Channel.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Channel.java	22 Aug 2003 02:23:26 -0000	1.1
  +++ Channel.java	21 Oct 2003 14:24:39 -0000	1.2
  @@ -81,7 +81,7 @@
        * @throws IOException
        * @throws ConnectionFailedException
        */
  -    public void open(URI uri, ChannelListner listner) throws TransportException;
  +    public void open(URI uri, URI backConnectURI, ChannelListner listner) throws TransportException;
   
       /**
        * starts an accepted connection.
  
  
  
  1.4       +35 -11    incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/ChannelPool.java
  
  Index: ChannelPool.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/ChannelPool.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ChannelPool.java	6 Sep 2003 13:52:11 -0000	1.3
  +++ ChannelPool.java	21 Oct 2003 14:24:39 -0000	1.4
  @@ -83,7 +83,8 @@
   
       private static final Log log = LogFactory.getLog(ChannelPool.class);
   
  -    private final URI uri;
  +    private final URI remoteURI;
  +    private URI backConnectURI;
       private final List available = new ArrayList();
       private final Correlator responseManager = new Correlator();
       private Router dispatcher;
  @@ -96,10 +97,18 @@
        * @param uri
        */
       public ChannelPool(URI uri, Router dispatcher) {
  -        this.uri = uri;
  +        this.remoteURI = uri;
           this.dispatcher = dispatcher;
  +        try {
  +            if (Registry.instance.getServerForClientRequest() == null) {
  +                backConnectURI = new URI("async://localhost:0");
  +            } else {
  +                backConnectURI = Registry.instance.getServerForClientRequest().getClientConnectURI();
  +            }
  +        } catch (Exception e) {
  +        }
       }
  -
  +    
       public void dispose() {
           Iterator iterator;
           synchronized (available) {
  @@ -138,9 +147,9 @@
               createdChannelCount++;
           }
   
  -        public void open(URI uri) throws TransportException, TransportException {
  +        public void open(URI uri, URI localuri) throws TransportException, TransportException {
               try {
  -                next.open(uri, this);
  +                next.open(uri, localuri, this);
               } catch (TransportException e) {
                   doCloseInternal = true;
                   throw e;
  @@ -286,15 +295,15 @@
               } while (!maxOpenConnections.attempt(100));
   
           } catch (InterruptedException e1) {
  -            throw new TransportException("(" + uri + "): " + e1);
  +            throw new TransportException("(" + remoteURI + "): " + e1);
           }
   
           // not available, make one on demand
           try {
   
  -            log.debug("channel connecting to: " + uri);
  +            log.debug("channel connecting to: " + remoteURI);
               PooledAsynchChannel c = new PooledAsynchChannel(TransportFactory.instance.createAsynchChannel());
  -            c.open(uri);
  +            c.open(remoteURI, backConnectURI);
   
               return c;
           } catch (Exception e) {
  @@ -302,9 +311,9 @@
               maxOpenConnections.release();
               log.debug("Connect Failed: ", e);
               if (log.isDebugEnabled()) {
  -                log.debug("channel connection to: " + uri + " failed", e);
  +                log.debug("channel connection to: " + remoteURI + " failed", e);
               }
  -            throw new TransportException("(" + uri + "): " + e);
  +            throw new TransportException("(" + remoteURI + "): " + e);
           }
       }
   
  @@ -481,6 +490,21 @@
   
       public int getCreatedChannelCount() {
           return createdChannelCount;
  +    }
  +
  +
  +    /**
  +     * @return Returns the backConnectURI.
  +     */
  +    public URI getBackConnectURI() {
  +        return backConnectURI;
  +    }
  +
  +    /**
  +     * @param backConnectURI The backConnectURI to set.
  +     */
  +    public void setBackConnectURI(URI backConnectURI) {
  +        this.backConnectURI = backConnectURI;
       }
   
   }
  
  
  
  1.2       +27 -20    incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingChannel.java
  
  Index: BlockingChannel.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingChannel.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- BlockingChannel.java	22 Aug 2003 02:23:26 -0000	1.1
  +++ BlockingChannel.java	21 Oct 2003 14:24:39 -0000	1.2
  @@ -62,7 +62,6 @@
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.OutputStream;
  -import java.io.StreamCorruptedException;
   import java.net.InetAddress;
   import java.net.InetSocketAddress;
   import java.net.URI;
  @@ -84,7 +83,6 @@
   import org.apache.geronimo.remoting.transport.async.AsyncMsg;
   import org.apache.geronimo.remoting.transport.async.Channel;
   import org.apache.geronimo.remoting.transport.async.ChannelListner;
  -import org.apache.geronimo.remoting.transport.async.Registry;
   /**
    * The Blocking implementation of the AsynchChannel interface.  
    * 
  @@ -99,13 +97,15 @@
       private ChannelListner listner;
       private Thread worker;
       private SocketChannel socketChannel;
  -    private URI remoteURI;
       private boolean closing = false;
   
       private Inflater inflator;
       private Deflater deflater;
   
  -    public void open(URI remoteURI, ChannelListner listner) throws TransportException {
  +    private URI remoteURI;
  +    private URI requestedURI;
  +
  +    public void open(URI remoteURI, URI backConnectURI, ChannelListner listner) throws TransportException {
   
           if (log.isTraceEnabled())
               log.trace("Connecting to : " + remoteURI);
  @@ -132,10 +132,7 @@
   
               DataOutputStream out = new DataOutputStream(socketChannel.socket().getOutputStream());
               out.writeUTF(remoteURI.toString());
  -            if (Registry.instance.getServerForClientRequest() == null)
  -                out.writeUTF("async://" + socketChannel.socket().getLocalAddress().getHostAddress() + ":0");
  -            else
  -                out.writeUTF(Registry.instance.getServerForClientRequest().getClientConnectURI().toString());
  +            out.writeUTF(backConnectURI.toString());
               out.flush();
               
               if (compression != -1) {
  @@ -165,15 +162,14 @@
           // the source vm.  Could be null.
           String sourceURI = in.readUTF();
           this.remoteURI = new URI(sourceURI);
  +        this.requestedURI = new URI(destURI);
           if (log.isTraceEnabled()) {
  -            log.trace("Connected from : " + remoteURI);
  -            log.trace("Request URI    : " + destURI);
  +            log.trace("Remote URI    : " + remoteURI);
  +            log.trace("Requested URI : " + requestedURI);
           }
  -
  -        // What options did the client want to use with this connection?		
  -        URI rruri = new URI(destURI);
  +        
           boolean enableTcpNoDelay = true;
  -        Properties params = URISupport.parseQueryParameters(rruri);
  +        Properties params = URISupport.parseQueryParameters(requestedURI);
           enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
           int compression = Integer.parseInt((String) params.getProperty("compression", "-1"));
   
  @@ -278,8 +274,9 @@
                   log.trace("Waiting for message");
                   message[0].clear();
                   socketChannel.read(message[0]);
  -                if( message[0].position()!=4 ) 
  -                    throw new StreamCorruptedException("Did not receive the full message header.");
  +                while( message[0].position()!=4 ) {
  +                    socketChannel.read(message[0]);
  +                }
                   message[0].flip();
                   int size = message[0].getInt();
                   
  @@ -296,13 +293,15 @@
                   message[1].clear();
                   message[1].limit(size);
                   socketChannel.read(message[1]);
  -                if( message[1].position()!=size ) 
  -                    throw new StreamCorruptedException("Did not receive the full message body.");
  +                while( message[1].position()!=size ) {
  +                    socketChannel.read(message[1]);
  +                }
                   message[1].flip();                
                   listner.receiveEvent(deserialize(message));
               }
  +            log.trace("Stopping due to remote end closing.");
           } catch (IOException e) {
  -            // The remote end died on us.
  +            log.trace("Stopping due to exception.", e);
           } finally {
               asyncClose();
           }
  @@ -393,4 +392,12 @@
       public URI getRemoteURI() {
           return remoteURI;
       }
  +    
  +    /**
  +     * @return Returns the requestedURI.
  +     */
  +    public URI getRequestedURI() {
  +        return requestedURI;
  +    }
  +
   }
  
  
  
  1.4       +4 -1      incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingServer.java
  
  Index: BlockingServer.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/bio/BlockingServer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- BlockingServer.java	30 Aug 2003 14:49:25 -0000	1.3
  +++ BlockingServer.java	21 Oct 2003 14:24:39 -0000	1.4
  @@ -188,11 +188,14 @@
                       continue;
                   }
                   try {
  +                    
                       socket.socket().setTcpNoDelay(enableTcpNoDelay);
                       BlockingChannel channel = new BlockingChannel();
                       channel.init(connectURI, socket);
                       ChannelPool pool = getChannelPool(channel.getRemoteURI());
  +                    pool.setBackConnectURI( channel.getRequestedURI() );
                       pool.associate(channel);
  +                    
                   } catch (TransportException ie) {
                       log.debug("Client connection could not be accepted: ", ie);
                   } catch (IOException ie) {
  
  
  
  1.2       +18 -7     incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingChannel.java
  
  Index: NonBlockingChannel.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingChannel.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- NonBlockingChannel.java	22 Aug 2003 02:23:27 -0000	1.1
  +++ NonBlockingChannel.java	21 Oct 2003 14:24:39 -0000	1.2
  @@ -84,7 +84,6 @@
   import org.apache.geronimo.remoting.transport.async.AsyncMsg;
   import org.apache.geronimo.remoting.transport.async.Channel;
   import org.apache.geronimo.remoting.transport.async.ChannelListner;
  -import org.apache.geronimo.remoting.transport.async.Registry;
   
   import EDU.oswego.cs.dl.util.concurrent.Mutex;
   /**
  @@ -110,7 +109,9 @@
       private SelectorManager selectorManager;
       private SelectionKey selectionKey;
   
  -    public void open(URI remoteURI, ChannelListner listner) throws TransportException {
  +    private URI requestedURI;
  +
  +    public void open(URI remoteURI, URI backConnectURI, ChannelListner listner) throws TransportException {
   
           if (log.isTraceEnabled())
               log.trace("Connecting to : " + remoteURI);
  @@ -137,10 +138,13 @@
   
               DataOutputStream out = new DataOutputStream(socketChannel.socket().getOutputStream());
               out.writeUTF(remoteURI.toString());
  +            out.writeUTF(backConnectURI.toString());
  +            /*
               if (Registry.instance.getServerForClientRequest() == null)
                   out.writeUTF("async://" + socketChannel.socket().getLocalAddress().getHostAddress() + ":0");
               else
                   out.writeUTF(Registry.instance.getServerForClientRequest().getClientConnectURI().toString());
  +            */
               out.flush();
   
               if (compression != -1) {
  @@ -173,15 +177,15 @@
           // the source vm.  Could be null.
           String sourceURI = in.readUTF();
           this.remoteURI = new URI(sourceURI);
  +        this.requestedURI = new URI(destURI);
           if (log.isTraceEnabled()) {
  -            log.trace("Connected from : " + remoteURI);
  -            log.trace("Request URI    : " + destURI);
  +            log.trace("Remote URI    : " + remoteURI);
  +            log.trace("Requested URI : " + requestedURI);
           }
   
           // What options did the client want to use with this connection?		
  -        URI rruri = new URI(destURI);
           boolean enableTcpNoDelay = true;
  -        Properties params = URISupport.parseQueryParameters(rruri);
  +        Properties params = URISupport.parseQueryParameters(requestedURI);
           enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
           int compression = Integer.parseInt((String) params.getProperty("compression", "-1"));
   
  @@ -468,6 +472,13 @@
               log.debug("Communications error, closing connection: ", e);
               asyncClose();
           }
  +    }
  +
  +    /**
  +     * @return
  +     */
  +    public URI getRequestedURI() {
  +        return requestedURI;
       }
   
   }
  
  
  
  1.4       +2 -1      incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingServer.java
  
  Index: NonBlockingServer.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/core/src/java/org/apache/geronimo/remoting/transport/async/nio/NonBlockingServer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- NonBlockingServer.java	30 Aug 2003 14:49:25 -0000	1.3
  +++ NonBlockingServer.java	21 Oct 2003 14:24:39 -0000	1.4
  @@ -192,6 +192,7 @@
                       NonBlockingChannel channel = new NonBlockingChannel();
                       channel.init(connectURI, socketChannel);
                       ChannelPool pool = getChannelPool(channel.getRemoteURI());
  +                    pool.setBackConnectURI( channel.getRequestedURI() );
                       pool.associate(channel);
                   } catch (TransportException ie) {
                       log.debug("Client connection could not be accepted: ", ie);