You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 17:15:18 UTC

svn commit: r1526202 [2/3] - in /qpid/trunk/qpid/java/amqp-1-0-client: example/src/main/java/org/apache/qpid/amqp_1_0/client/ src/main/java/org/apache/qpid/amqp_1_0/client/

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Wed Sep 25 15:15:18 2013
@@ -1,412 +1,412 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.net.ssl.SSLSocketFactory;
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-
-public class Connection
-{
-    private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
-    private static final int MAX_FRAME_SIZE = 65536;
-
-    private String _address;
-    private ConnectionEndpoint _conn;
-    private int _sessionCount;
-
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password) throws ConnectionException
-    {
-        this(address, port, username, password, MAX_FRAME_SIZE);
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password, String remoteHostname) throws ConnectionException
-    {
-        this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final int maxFrameSize) throws ConnectionException
-    {
-        this(address,port,username,password,maxFrameSize,new Container());
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final Container container) throws ConnectionException
-    {
-        this(address,port,username,password,MAX_FRAME_SIZE,container);
-    }
-
-    public Connection(final String address,
-                      final int port,
-                      final String username,
-                      final String password,
-                      final int maxFrameSize,
-                      final Container container) throws ConnectionException
-    {
-        this(address,port,username,password,maxFrameSize,container, null);
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final int maxFrameSize,
-                  final Container container,
-                  final String remoteHostname) throws ConnectionException
-    {
-        this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final Container container,
-                  final boolean ssl) throws ConnectionException
-    {
-        this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final String remoteHost,
-                  final boolean ssl) throws ConnectionException
-    {
-        this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
-    }
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final Container container,
-                  final String remoteHost,
-                  final boolean ssl) throws ConnectionException
-    {
-        this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
-    }
-
-
-    public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final int maxFrameSize,
-                  final Container container,
-                  final String remoteHostname, boolean ssl) throws ConnectionException
-    {
-
-        _address = address;
-
-        try
-        {
-            final Socket s;
-            if(ssl)
-            {
-                s = SSLSocketFactory.getDefault().createSocket(address, port);
-            }
-            else
-            {
-                s = new Socket(address, port);
-            }
-
-
-            Principal principal = username == null ? null : new Principal()
-            {
-
-                public String getName()
-                {
-                    return username;
-                }
-            };
-            _conn = new ConnectionEndpoint(container, principal, password);
-            _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
-            _conn.setRemoteAddress(s.getRemoteSocketAddress());
-            _conn.setRemoteHostname(remoteHostname);
-
-
-
-            ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
-
-            final OutputStream outputStream = s.getOutputStream();
-
-            ConnectionHandler.BytesSource src;
-
-            if(_conn.requiresSASL())
-            {
-                ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
-
-                src =  new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
-                                                                                                           (byte)'M',
-                                                                                                           (byte)'Q',
-                                                                                                           (byte)'P',
-                                                                                                           (byte)3,
-                                                                                                           (byte)1,
-                                                                                                           (byte)0,
-                                                                                                           (byte)0),
-                                                                   new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
-                                                                   new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
-                                                                                                           (byte)'M',
-                                                                                                           (byte)'Q',
-                                                                                                           (byte)'P',
-                                                                                                           (byte)0,
-                                                                                                           (byte)1,
-                                                                                                           (byte)0,
-                                                                                                           (byte)0),
-                                                                   new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
-                );
-
-                _conn.setSaslFrameOutput(saslOut);
-            }
-            else
-            {
-                src =  new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
-                                                                                                           (byte)'M',
-                                                                                                           (byte)'Q',
-                                                                                                           (byte)'P',
-                                                                                                           (byte)0,
-                                                                                                           (byte)1,
-                                                                                                           (byte)0,
-                                                                                                           (byte)0),
-                                                                   new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
-                );
-            }
-
-
-            ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
-            Thread outputThread = new Thread(outputHandler);
-            outputThread.setDaemon(true);
-            outputThread.start();
-            _conn.setFrameOutputHandler(out);
-
-
-
-            final ConnectionHandler handler = new ConnectionHandler(_conn);
-            final InputStream inputStream = s.getInputStream();
-
-            Thread inputThread = new Thread(new Runnable()
-            {
-
-                public void run()
-                {
-                    try
-                    {
-                        doRead(handler, inputStream);
-                    }
-                    finally
-                    {
-                        if(_conn.closedForInput() && _conn.closedForOutput())
-                        {
-                            try
-                            {
-                                s.close();
-                            }
-                            catch (IOException e)
-                            {
-                                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                            }
-                        }
-                    }
-                }
-            });
-
-            inputThread.setDaemon(true);
-            inputThread.start();
-
-            _conn.open();
-
-        }
-        catch (IOException e)
-        {
-            throw new ConnectionException(e);
-        }
-
-
-    }
-
-    private Connection(ConnectionEndpoint endpoint)
-    {
-        _conn = endpoint;
-    }
-
-
-    private void doRead(final AMQPTransport transport, final InputStream inputStream)
-    {
-        byte[] buf = new byte[2<<15];
-        ByteBuffer bbuf = ByteBuffer.wrap(buf);
-        final Object lock = new Object();
-        transport.setInputStateChangeListener(new StateChangeListener(){
-
-            public void onStateChange(final boolean active)
-            {
-                synchronized(lock)
-                {
-                    lock.notifyAll();
-                }
-            }
-        });
-
-        try
-        {
-            int read;
-            while((read = inputStream.read(buf)) != -1)
-            {
-                bbuf.position(0);
-                bbuf.limit(read);
-
-                while(bbuf.hasRemaining() && transport.isOpenForInput())
-                {
-                    transport.processBytes(bbuf);
-                }
-
-
-            }
-        }
-        catch (IOException e)
-        {
-            e.printStackTrace();
-        }
-
-    }
-
-    public Session createSession() throws ConnectionException
-    {
-        checkNotClosed();
-        Session session = new Session(this,String.valueOf(_sessionCount++));
-        return session;
-    }
-
-    void checkNotClosed() throws ConnectionClosedException
-    {
-        if(getEndpoint().isClosed())
-        {
-            throw new ConnectionClosedException(getEndpoint().getRemoteError());
-        }
-    }
-
-    public ConnectionEndpoint getEndpoint()
-    {
-        return _conn;
-    }
-
-    public void awaitOpen()
-    {
-        synchronized(getEndpoint().getLock())
-        {
-            while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
-            {
-                try
-                {
-                    getEndpoint().getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-        }
-
-    }
-
-    private void doRead(final ConnectionHandler handler, final InputStream inputStream)
-    {
-        byte[] buf = new byte[2<<15];
-
-
-        try
-        {
-            int read;
-            boolean done = false;
-            while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
-            {
-                ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
-                Binary b = new Binary(buf,0,read);
-
-                if(RAW_LOGGER.isLoggable(Level.FINE))
-                {
-                    RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
-                }
-                while(bbuf.hasRemaining() && !handler.isDone())
-                {
-                    handler.parse(bbuf);
-                }
-
-
-            }
-        }
-        catch (IOException e)
-        {
-            e.printStackTrace();
-        }
-    }
-
-    public void close()
-    {
-        _conn.close();
-
-        synchronized (_conn.getLock())
-        {
-            while(!_conn.closedForInput())
-            {
-                try
-                {
-                    _conn.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-
-                }
-            }
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+public class Connection
+{
+    private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+    private static final int MAX_FRAME_SIZE = 65536;
+
+    private String _address;
+    private ConnectionEndpoint _conn;
+    private int _sessionCount;
+
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password) throws ConnectionException
+    {
+        this(address, port, username, password, MAX_FRAME_SIZE);
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password, String remoteHostname) throws ConnectionException
+    {
+        this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final int maxFrameSize) throws ConnectionException
+    {
+        this(address,port,username,password,maxFrameSize,new Container());
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final Container container) throws ConnectionException
+    {
+        this(address,port,username,password,MAX_FRAME_SIZE,container);
+    }
+
+    public Connection(final String address,
+                      final int port,
+                      final String username,
+                      final String password,
+                      final int maxFrameSize,
+                      final Container container) throws ConnectionException
+    {
+        this(address,port,username,password,maxFrameSize,container, null);
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final int maxFrameSize,
+                  final Container container,
+                  final String remoteHostname) throws ConnectionException
+    {
+        this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final Container container,
+                  final boolean ssl) throws ConnectionException
+    {
+        this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final String remoteHost,
+                  final boolean ssl) throws ConnectionException
+    {
+        this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
+    }
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final Container container,
+                  final String remoteHost,
+                  final boolean ssl) throws ConnectionException
+    {
+        this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
+    }
+
+
+    public Connection(final String address,
+                  final int port,
+                  final String username,
+                  final String password,
+                  final int maxFrameSize,
+                  final Container container,
+                  final String remoteHostname, boolean ssl) throws ConnectionException
+    {
+
+        _address = address;
+
+        try
+        {
+            final Socket s;
+            if(ssl)
+            {
+                s = SSLSocketFactory.getDefault().createSocket(address, port);
+            }
+            else
+            {
+                s = new Socket(address, port);
+            }
+
+
+            Principal principal = username == null ? null : new Principal()
+            {
+
+                public String getName()
+                {
+                    return username;
+                }
+            };
+            _conn = new ConnectionEndpoint(container, principal, password);
+            _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+            _conn.setRemoteAddress(s.getRemoteSocketAddress());
+            _conn.setRemoteHostname(remoteHostname);
+
+
+
+            ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
+
+
+            final OutputStream outputStream = s.getOutputStream();
+
+            ConnectionHandler.BytesSource src;
+
+            if(_conn.requiresSASL())
+            {
+                ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
+
+                src =  new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+                                                                                                           (byte)'M',
+                                                                                                           (byte)'Q',
+                                                                                                           (byte)'P',
+                                                                                                           (byte)3,
+                                                                                                           (byte)1,
+                                                                                                           (byte)0,
+                                                                                                           (byte)0),
+                                                                   new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+                                                                   new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+                                                                                                           (byte)'M',
+                                                                                                           (byte)'Q',
+                                                                                                           (byte)'P',
+                                                                                                           (byte)0,
+                                                                                                           (byte)1,
+                                                                                                           (byte)0,
+                                                                                                           (byte)0),
+                                                                   new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+                );
+
+                _conn.setSaslFrameOutput(saslOut);
+            }
+            else
+            {
+                src =  new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
+                                                                                                           (byte)'M',
+                                                                                                           (byte)'Q',
+                                                                                                           (byte)'P',
+                                                                                                           (byte)0,
+                                                                                                           (byte)1,
+                                                                                                           (byte)0,
+                                                                                                           (byte)0),
+                                                                   new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+                );
+            }
+
+
+            ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
+            Thread outputThread = new Thread(outputHandler);
+            outputThread.setDaemon(true);
+            outputThread.start();
+            _conn.setFrameOutputHandler(out);
+
+
+
+            final ConnectionHandler handler = new ConnectionHandler(_conn);
+            final InputStream inputStream = s.getInputStream();
+
+            Thread inputThread = new Thread(new Runnable()
+            {
+
+                public void run()
+                {
+                    try
+                    {
+                        doRead(handler, inputStream);
+                    }
+                    finally
+                    {
+                        if(_conn.closedForInput() && _conn.closedForOutput())
+                        {
+                            try
+                            {
+                                s.close();
+                            }
+                            catch (IOException e)
+                            {
+                                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                            }
+                        }
+                    }
+                }
+            });
+
+            inputThread.setDaemon(true);
+            inputThread.start();
+
+            _conn.open();
+
+        }
+        catch (IOException e)
+        {
+            throw new ConnectionException(e);
+        }
+
+
+    }
+
+    private Connection(ConnectionEndpoint endpoint)
+    {
+        _conn = endpoint;
+    }
+
+
+    private void doRead(final AMQPTransport transport, final InputStream inputStream)
+    {
+        byte[] buf = new byte[2<<15];
+        ByteBuffer bbuf = ByteBuffer.wrap(buf);
+        final Object lock = new Object();
+        transport.setInputStateChangeListener(new StateChangeListener(){
+
+            public void onStateChange(final boolean active)
+            {
+                synchronized(lock)
+                {
+                    lock.notifyAll();
+                }
+            }
+        });
+
+        try
+        {
+            int read;
+            while((read = inputStream.read(buf)) != -1)
+            {
+                bbuf.position(0);
+                bbuf.limit(read);
+
+                while(bbuf.hasRemaining() && transport.isOpenForInput())
+                {
+                    transport.processBytes(bbuf);
+                }
+
+
+            }
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+
+    }
+
+    public Session createSession() throws ConnectionException
+    {
+        checkNotClosed();
+        Session session = new Session(this,String.valueOf(_sessionCount++));
+        return session;
+    }
+
+    void checkNotClosed() throws ConnectionClosedException
+    {
+        if(getEndpoint().isClosed())
+        {
+            throw new ConnectionClosedException(getEndpoint().getRemoteError());
+        }
+    }
+
+    public ConnectionEndpoint getEndpoint()
+    {
+        return _conn;
+    }
+
+    public void awaitOpen()
+    {
+        synchronized(getEndpoint().getLock())
+        {
+            while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
+            {
+                try
+                {
+                    getEndpoint().getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+        }
+
+    }
+
+    private void doRead(final ConnectionHandler handler, final InputStream inputStream)
+    {
+        byte[] buf = new byte[2<<15];
+
+
+        try
+        {
+            int read;
+            boolean done = false;
+            while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
+            {
+                ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
+                Binary b = new Binary(buf,0,read);
+
+                if(RAW_LOGGER.isLoggable(Level.FINE))
+                {
+                    RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
+                }
+                while(bbuf.hasRemaining() && !handler.isDone())
+                {
+                    handler.parse(bbuf);
+                }
+
+
+            }
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    public void close()
+    {
+        _conn.close();
+
+        synchronized (_conn.getLock())
+        {
+            while(!_conn.closedForInput())
+            {
+                try
+                {
+                    _conn.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+
+                }
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java Wed Sep 25 15:15:18 2013
@@ -1,148 +1,148 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-public class Message
-{
-    private Binary _deliveryTag;
-    private List<Section> _payload = new ArrayList<Section>();
-    private Boolean _resume;
-    private boolean _settled;
-    private DeliveryState _deliveryState;
-    private Receiver _receiver;
-
-
-    public Message()
-    {
-    }
-
-    public Message(Collection<Section> sections)
-    {
-        _payload.addAll(sections);
-    }
-
-    public Message(Section section)
-    {
-        this(Collections.singletonList(section));
-    }
-
-    public Message(String message)
-    {
-        this(new AmqpValue(message));
-    }
-
-
-    public Binary getDeliveryTag()
-    {
-        return _deliveryTag;
-    }
-
-    public void setDeliveryTag(Binary deliveryTag)
-    {
-        _deliveryTag = deliveryTag;
-    }
-
-    public List<Section> getPayload()
-    {
-        return Collections.unmodifiableList(_payload);
-    }
-
-    private <T extends Section> T getSection(Class<T> clazz)
-    {
-        for(Section s : _payload)
-        {
-            if(clazz.isAssignableFrom(s.getClass()))
-            {
-                return (T) s;
-            }
-        }
-        return null;
-    }
-
-    public ApplicationProperties getApplicationProperties()
-    {
-        return getSection(ApplicationProperties.class);
-    }
-
-    public Properties getProperties()
-    {
-        return getSection(Properties.class);
-    }
-
-    public Header getHeader()
-    {
-        return getSection(Header.class);
-    }
-
-
-    public void setResume(final Boolean resume)
-    {
-        _resume = resume;
-    }
-
-    public boolean isResume()
-    {
-        return Boolean.TRUE.equals(_resume);
-    }
-
-    public void setDeliveryState(DeliveryState state)
-    {
-        _deliveryState = state;
-    }
-
-    public DeliveryState getDeliveryState()
-    {
-        return _deliveryState;
-    }
-
-    public void setSettled(boolean settled)
-    {
-        _settled = settled;
-    }
-
-    public boolean getSettled()
-    {
-        return _settled;
-    }
-
-    public void setReceiver(final Receiver receiver)
-    {
-        _receiver = receiver;
-    }
-
-    public Receiver getReceiver()
-    {
-        return _receiver;
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class Message
+{
+    private Binary _deliveryTag;
+    private List<Section> _payload = new ArrayList<Section>();
+    private Boolean _resume;
+    private boolean _settled;
+    private DeliveryState _deliveryState;
+    private Receiver _receiver;
+
+
+    public Message()
+    {
+    }
+
+    public Message(Collection<Section> sections)
+    {
+        _payload.addAll(sections);
+    }
+
+    public Message(Section section)
+    {
+        this(Collections.singletonList(section));
+    }
+
+    public Message(String message)
+    {
+        this(new AmqpValue(message));
+    }
+
+
+    public Binary getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+
+    public void setDeliveryTag(Binary deliveryTag)
+    {
+        _deliveryTag = deliveryTag;
+    }
+
+    public List<Section> getPayload()
+    {
+        return Collections.unmodifiableList(_payload);
+    }
+
+    private <T extends Section> T getSection(Class<T> clazz)
+    {
+        for(Section s : _payload)
+        {
+            if(clazz.isAssignableFrom(s.getClass()))
+            {
+                return (T) s;
+            }
+        }
+        return null;
+    }
+
+    public ApplicationProperties getApplicationProperties()
+    {
+        return getSection(ApplicationProperties.class);
+    }
+
+    public Properties getProperties()
+    {
+        return getSection(Properties.class);
+    }
+
+    public Header getHeader()
+    {
+        return getSection(Header.class);
+    }
+
+
+    public void setResume(final Boolean resume)
+    {
+        _resume = resume;
+    }
+
+    public boolean isResume()
+    {
+        return Boolean.TRUE.equals(_resume);
+    }
+
+    public void setDeliveryState(DeliveryState state)
+    {
+        _deliveryState = state;
+    }
+
+    public DeliveryState getDeliveryState()
+    {
+        return _deliveryState;
+    }
+
+    public void setSettled(boolean settled)
+    {
+        _settled = settled;
+    }
+
+    public boolean getSettled()
+    {
+        return _settled;
+    }
+
+    public void setReceiver(final Receiver receiver)
+    {
+        _receiver = receiver;
+    }
+
+    public Receiver getReceiver()
+    {
+        return _receiver;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Wed Sep 25 15:15:18 2013
@@ -1,615 +1,615 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class Receiver implements DeliveryStateHandler
-{
-    private ReceivingLinkEndpoint _endpoint;
-    private int _id;
-    private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
-    private Session _session;
-
-    private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
-    private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
-    private MessageArrivalListener _messageArrivalListener;
-    private org.apache.qpid.amqp_1_0.type.transport.Error _error;
-    private Runnable _remoteErrorTask;
-
-    public Receiver(final Session session,
-                    final String linkName,
-                    final Target target,
-                    final Source source,
-                    final AcknowledgeMode ackMode) throws ConnectionErrorException
-    {
-        this(session, linkName, target, source, ackMode, false);
-    }
-
-    public Receiver(final Session session,
-                    final String linkName,
-                    final Target target,
-                    final Source source,
-                    final AcknowledgeMode ackMode,
-                    boolean isDurable) throws ConnectionErrorException
-    {
-        this(session,linkName,target,source,ackMode,isDurable,null);
-    }
-
-    public Receiver(final Session session,
-                    final String linkName,
-                    final Target target,
-                    final Source source,
-                    final AcknowledgeMode ackMode,
-                    final boolean isDurable,
-                    final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
-    {
-
-        session.getConnection().checkNotClosed();
-        _session = session;
-        if(isDurable)
-        {
-            source.setDurable(TerminusDurability.UNSETTLED_STATE);
-            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-        }
-        else if(source != null)
-        {
-            source.setDurable(TerminusDurability.NONE);
-            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-        }
-        _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
-                                                                      UnsignedInteger.ZERO);
-
-        _endpoint.setDeliveryStateHandler(this);
-
-        switch(ackMode)
-        {
-            case ALO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-                break;
-            case AMO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-                break;
-            case EO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
-                break;
-
-        }
-
-        _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
-        {
-            @Override public void messageTransfer(final Transfer xfr)
-            {
-                _prefetchQueue.add(xfr);
-                postPrefetchAction();
-            }
-
-            @Override
-            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
-            {
-                _error = detach.getError();
-                if(detach.getError()!=null)
-                {
-                    remoteError();
-                }
-                super.remoteDetached(endpoint, detach);
-            }
-        });
-
-        _endpoint.setLocalUnsettled(unsettled);
-        _endpoint.attach();
-
-        synchronized(_endpoint.getLock())
-        {
-            while(!_endpoint.isAttached() && !_endpoint.isDetached())
-            {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-        }
-
-        if(_endpoint.getSource() == null)
-        {
-            synchronized(_endpoint.getLock())
-            {
-                while(!_endpoint.isDetached())
-                {
-                    try
-                    {
-                        _endpoint.getLock().wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
-            }
-            throw new ConnectionErrorException(getError());
-        }
-        else
-        {
-
-        }
-    }
-
-    private void remoteError()
-    {
-        if(_remoteErrorTask != null)
-        {
-            _remoteErrorTask.run();
-        }
-    }
-
-    private void postPrefetchAction()
-    {
-        if(_messageArrivalListener != null)
-        {
-            _messageArrivalListener.messageArrived(this);
-        }
-    }
-
-    public void setCredit(UnsignedInteger credit, boolean window)
-    {
-        _endpoint.setLinkCredit(credit);
-        _endpoint.setCreditWindow(window);
-
-    }
-
-
-    public String getAddress()
-    {
-        return ((Source)_endpoint.getSource()).getAddress();
-    }
-
-    public Map getFilter()
-    {
-        return ((Source)_endpoint.getSource()).getFilter();
-    }
-
-    public Message receive()
-    {
-        return receive(-1L);
-    }
-
-    public Message receive(boolean wait)
-    {
-        return receive(wait ? -1L : 0L);
-    }
-
-    // 0 means no wait, -1 wait forever
-    public Message receive(long wait)
-    {
-        Message m = null;
-        Transfer xfr;
-        long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
-
-        while((xfr = receiveFromPrefetch(wait)) != null )
-        {
-
-            if(!Boolean.TRUE.equals(xfr.getAborted()))
-            {
-                Binary deliveryTag = xfr.getDeliveryTag();
-                Boolean resume = xfr.getResume();
-
-                List<Section> sections = new ArrayList<Section>();
-                List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
-                int totalSize = 0;
-
-                boolean hasMore;
-                do
-                {
-                    hasMore = Boolean.TRUE.equals(xfr.getMore());
-
-                    ByteBuffer buf = xfr.getPayload();
-
-                    if(buf != null)
-                    {
-
-                        totalSize += buf.remaining();
-
-                        payloads.add(buf);
-                    }
-                    if(hasMore)
-                    {
-                        xfr = receiveFromPrefetch(-1l);
-                        if(xfr== null)
-                        {
-                            // TODO - this is wrong!!!!
-                            System.out.println("eeek");
-                        }
-                    }
-                }
-                while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
-
-                if(!Boolean.TRUE.equals(xfr.getAborted()))
-                {
-                    ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
-                    for(ByteBuffer payload : payloads)
-                    {
-                        allPayload.put(payload);
-                    }
-                    allPayload.flip();
-                    SectionDecoder decoder = _session.getSectionDecoder();
-
-                    try
-                    {
-                        sections = decoder.parseAll(allPayload);
-                    }
-                    catch (AmqpErrorException e)
-                    {
-                        // todo - throw a sensible error
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                    m = new Message(sections);
-                    m.setDeliveryTag(deliveryTag);
-                    m.setResume(resume);
-                    m.setReceiver(this);
-                    break;
-                }
-            }
-
-            if(wait > 0L)
-            {
-                wait = endTime - System.currentTimeMillis();
-                if(wait <=0L)
-                {
-                    break;
-                }
-            }
-        }
-
-
-        return m;
-
-    }
-
-    private Transfer receiveFromPrefetch(long wait)
-    {
-        long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
-        final Object lock = _endpoint.getLock();
-        synchronized(lock)
-        {
-            Transfer xfr;
-            while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
-                  && wait != 0)
-            {
-                try
-                {
-                    if(wait>0L)
-                    {
-                        lock.wait(wait);
-                    }
-                    else if(wait<0L)
-                    {
-                        lock.wait();
-                    }
-                }
-                catch (InterruptedException e)
-                {
-                    return null;
-                }
-                if(wait > 0L)
-                {
-                    wait = endTime - System.currentTimeMillis();
-                    if(wait <= 0L)
-                    {
-                        break;
-                    }
-                }
-
-            }
-            if(xfr != null)
-            {
-                _prefetchQueue.poll();
-
-            }
-
-            return xfr;
-        }
-
-    }
-
-
-    public void release(final Message m)
-    {
-        release(m.getDeliveryTag());
-    }
-
-    public void release(Binary deliveryTag)
-    {
-        update(new Released(), deliveryTag, null, null);
-    }
-
-
-    public void modified(Binary tag)
-    {
-        final Modified outcome = new Modified();
-        outcome.setDeliveryFailed(true);
-
-        update(outcome, tag, null, null);
-    }
-
-    public void acknowledge(final Message m)
-    {
-        acknowledge(m.getDeliveryTag());
-    }
-
-    public void acknowledge(final Message m, SettledAction a)
-    {
-        acknowledge(m.getDeliveryTag(), a);
-    }
-
-
-    public void acknowledge(final Message m, Transaction txn)
-    {
-        acknowledge(m.getDeliveryTag(), txn);
-    }
-
-
-    public void acknowledge(final Binary deliveryTag)
-    {
-        acknowledge(deliveryTag, null, null);
-    }
-
-
-    public void acknowledge(final Binary deliveryTag, SettledAction a)
-    {
-        acknowledge(deliveryTag, null, a);
-    }
-
-    public void acknowledge(final Binary deliveryTag, final Transaction txn)
-    {
-        acknowledge(deliveryTag, txn, null);
-    }
-
-    public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
-    {
-        update(new Accepted(), deliveryTag, txn, action);
-    }
-
-    public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
-    {
-
-        DeliveryState state;
-        if(txn != null)
-        {
-            TransactionalState txnState = new TransactionalState();
-            txnState.setOutcome(outcome);
-            txnState.setTxnId(txn.getTxnId());
-            state = txnState;
-        }
-        else
-        {
-            state = (DeliveryState) outcome;
-        }
-        boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
-        if(!(settled || action == null))
-        {
-            _unsettledMap.put(deliveryTag, action);
-        }
-
-        _endpoint.updateDisposition(deliveryTag,state, settled);
-    }
-
-    public Error getError()
-    {
-        return _error;
-    }
-
-    public void acknowledgeAll(Message m)
-    {
-        acknowledgeAll(m.getDeliveryTag());
-    }
-
-    public void acknowledgeAll(Binary deliveryTag)
-    {
-        acknowledgeAll(deliveryTag, null, null);
-    }
-
-    public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
-    {
-        updateAll(new Accepted(), deliveryTag, txn, action);
-    }
-
-    public void updateAll(Outcome outcome, Binary deliveryTag)
-    {
-        updateAll(outcome, deliveryTag, null, null);
-    }
-
-    public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
-    {
-        DeliveryState state;
-
-        if(txn != null)
-        {
-            TransactionalState txnState = new TransactionalState();
-            txnState.setOutcome(outcome);
-            txnState.setTxnId(txn.getTxnId());
-            state = txnState;
-        }
-        else
-        {
-            state = (DeliveryState) outcome;
-        }
-        boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
-        if(!(settled || action == null))
-        {
-            _unsettledMap.put(deliveryTag, action);
-        }
-        _endpoint.updateAllDisposition(deliveryTag, state, settled);
-    }
-
-
-
-    public void close()
-    {
-        _endpoint.setTarget(null);
-        _endpoint.close();
-        Message msg;
-        while((msg = receive(-1l)) != null)
-        {
-            release(msg);
-        }
-
-    }
-
-
-    public void detach()
-    {
-        _endpoint.setTarget(null);
-        _endpoint.detach();
-        Message msg;
-        while((msg = receive(-1l)) != null)
-        {
-            release(msg);
-        }
-
-    }
-
-    public void drain()
-    {
-        _endpoint.drain();
-    }
-
-    /**
-     * Waits for the receiver to drain or a message to be available to be received.
-     * @return true if the receiver has been drained.
-     */
-    public boolean drainWait()
-    {
-        final Object lock = _endpoint.getLock();
-        synchronized(lock)
-        {
-            try
-            {
-                while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
-                {
-                    lock.wait();
-                }
-            }
-            catch (InterruptedException e)
-            {
-            }
-        }
-        return _prefetchQueue.peek()==null && _endpoint.isDrained();
-    }
-
-    /**
-     * Clears the receiver drain so that message delivery can resume.
-     */
-    public void clearDrain()
-    {
-        _endpoint.clearDrain();
-    }
-
-    public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
-    {
-        _endpoint.setLinkCredit(credit);
-        _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
-        _endpoint.setCreditWindow(false);
-
-    }
-
-    public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
-    {
-        if(Boolean.TRUE.equals(settled))
-        {
-            SettledAction action = _unsettledMap.remove(deliveryTag);
-            if(action != null)
-            {
-                action.onSettled(deliveryTag);
-            }
-        }
-    }
-
-    public Map<Binary, Outcome> getRemoteUnsettled()
-    {
-        return _endpoint.getInitialUnsettledMap();
-    }
-
-
-    public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
-    {
-        synchronized(_endpoint.getLock())
-        {
-            _messageArrivalListener = messageArrivalListener;
-            int prefetchSize = _prefetchQueue.size();
-            for(int i = 0; i < prefetchSize; i++)
-            {
-                postPrefetchAction();
-            }
-        }
-    }
-
-    public Session getSession()
-    {
-        return _session;
-    }
-
-    public org.apache.qpid.amqp_1_0.type.Source getSource()
-    {
-        return _endpoint.getSource();
-    }
-
-    public static interface SettledAction
-    {
-        public void onSettled(Binary deliveryTag);
-    }
-
-
-    public interface MessageArrivalListener
-    {
-        void messageArrived(Receiver receiver);
-    }
-
-    public void setRemoteErrorListener(Runnable listener)
-    {
-        _remoteErrorTask = listener;
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Receiver implements DeliveryStateHandler
+{
+    private ReceivingLinkEndpoint _endpoint;
+    private int _id;
+    private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
+    private Session _session;
+
+    private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
+    private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+    private MessageArrivalListener _messageArrivalListener;
+    private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+    private Runnable _remoteErrorTask;
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final Target target,
+                    final Source source,
+                    final AcknowledgeMode ackMode) throws ConnectionErrorException
+    {
+        this(session, linkName, target, source, ackMode, false);
+    }
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final Target target,
+                    final Source source,
+                    final AcknowledgeMode ackMode,
+                    boolean isDurable) throws ConnectionErrorException
+    {
+        this(session,linkName,target,source,ackMode,isDurable,null);
+    }
+
+    public Receiver(final Session session,
+                    final String linkName,
+                    final Target target,
+                    final Source source,
+                    final AcknowledgeMode ackMode,
+                    final boolean isDurable,
+                    final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
+    {
+
+        session.getConnection().checkNotClosed();
+        _session = session;
+        if(isDurable)
+        {
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        }
+        else if(source != null)
+        {
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        }
+        _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
+                                                                      UnsignedInteger.ZERO);
+
+        _endpoint.setDeliveryStateHandler(this);
+
+        switch(ackMode)
+        {
+            case ALO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                break;
+            case AMO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                break;
+            case EO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+                break;
+
+        }
+
+        _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
+        {
+            @Override public void messageTransfer(final Transfer xfr)
+            {
+                _prefetchQueue.add(xfr);
+                postPrefetchAction();
+            }
+
+            @Override
+            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+            {
+                _error = detach.getError();
+                if(detach.getError()!=null)
+                {
+                    remoteError();
+                }
+                super.remoteDetached(endpoint, detach);
+            }
+        });
+
+        _endpoint.setLocalUnsettled(unsettled);
+        _endpoint.attach();
+
+        synchronized(_endpoint.getLock())
+        {
+            while(!_endpoint.isAttached() && !_endpoint.isDetached())
+            {
+                try
+                {
+                    _endpoint.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+        }
+
+        if(_endpoint.getSource() == null)
+        {
+            synchronized(_endpoint.getLock())
+            {
+                while(!_endpoint.isDetached())
+                {
+                    try
+                    {
+                        _endpoint.getLock().wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+            }
+            throw new ConnectionErrorException(getError());
+        }
+        else
+        {
+
+        }
+    }
+
+    private void remoteError()
+    {
+        if(_remoteErrorTask != null)
+        {
+            _remoteErrorTask.run();
+        }
+    }
+
+    private void postPrefetchAction()
+    {
+        if(_messageArrivalListener != null)
+        {
+            _messageArrivalListener.messageArrived(this);
+        }
+    }
+
+    public void setCredit(UnsignedInteger credit, boolean window)
+    {
+        _endpoint.setLinkCredit(credit);
+        _endpoint.setCreditWindow(window);
+
+    }
+
+
+    public String getAddress()
+    {
+        return ((Source)_endpoint.getSource()).getAddress();
+    }
+
+    public Map getFilter()
+    {
+        return ((Source)_endpoint.getSource()).getFilter();
+    }
+
+    public Message receive()
+    {
+        return receive(-1L);
+    }
+
+    public Message receive(boolean wait)
+    {
+        return receive(wait ? -1L : 0L);
+    }
+
+    // 0 means no wait, -1 wait forever
+    public Message receive(long wait)
+    {
+        Message m = null;
+        Transfer xfr;
+        long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
+
+        while((xfr = receiveFromPrefetch(wait)) != null )
+        {
+
+            if(!Boolean.TRUE.equals(xfr.getAborted()))
+            {
+                Binary deliveryTag = xfr.getDeliveryTag();
+                Boolean resume = xfr.getResume();
+
+                List<Section> sections = new ArrayList<Section>();
+                List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
+                int totalSize = 0;
+
+                boolean hasMore;
+                do
+                {
+                    hasMore = Boolean.TRUE.equals(xfr.getMore());
+
+                    ByteBuffer buf = xfr.getPayload();
+
+                    if(buf != null)
+                    {
+
+                        totalSize += buf.remaining();
+
+                        payloads.add(buf);
+                    }
+                    if(hasMore)
+                    {
+                        xfr = receiveFromPrefetch(-1l);
+                        if(xfr== null)
+                        {
+                            // TODO - this is wrong!!!!
+                            System.out.println("eeek");
+                        }
+                    }
+                }
+                while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
+
+                if(!Boolean.TRUE.equals(xfr.getAborted()))
+                {
+                    ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
+                    for(ByteBuffer payload : payloads)
+                    {
+                        allPayload.put(payload);
+                    }
+                    allPayload.flip();
+                    SectionDecoder decoder = _session.getSectionDecoder();
+
+                    try
+                    {
+                        sections = decoder.parseAll(allPayload);
+                    }
+                    catch (AmqpErrorException e)
+                    {
+                        // todo - throw a sensible error
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    m = new Message(sections);
+                    m.setDeliveryTag(deliveryTag);
+                    m.setResume(resume);
+                    m.setReceiver(this);
+                    break;
+                }
+            }
+
+            if(wait > 0L)
+            {
+                wait = endTime - System.currentTimeMillis();
+                if(wait <=0L)
+                {
+                    break;
+                }
+            }
+        }
+
+
+        return m;
+
+    }
+
+    private Transfer receiveFromPrefetch(long wait)
+    {
+        long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
+        final Object lock = _endpoint.getLock();
+        synchronized(lock)
+        {
+            Transfer xfr;
+            while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
+                  && wait != 0)
+            {
+                try
+                {
+                    if(wait>0L)
+                    {
+                        lock.wait(wait);
+                    }
+                    else if(wait<0L)
+                    {
+                        lock.wait();
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    return null;
+                }
+                if(wait > 0L)
+                {
+                    wait = endTime - System.currentTimeMillis();
+                    if(wait <= 0L)
+                    {
+                        break;
+                    }
+                }
+
+            }
+            if(xfr != null)
+            {
+                _prefetchQueue.poll();
+
+            }
+
+            return xfr;
+        }
+
+    }
+
+
+    public void release(final Message m)
+    {
+        release(m.getDeliveryTag());
+    }
+
+    public void release(Binary deliveryTag)
+    {
+        update(new Released(), deliveryTag, null, null);
+    }
+
+
+    public void modified(Binary tag)
+    {
+        final Modified outcome = new Modified();
+        outcome.setDeliveryFailed(true);
+
+        update(outcome, tag, null, null);
+    }
+
+    public void acknowledge(final Message m)
+    {
+        acknowledge(m.getDeliveryTag());
+    }
+
+    public void acknowledge(final Message m, SettledAction a)
+    {
+        acknowledge(m.getDeliveryTag(), a);
+    }
+
+
+    public void acknowledge(final Message m, Transaction txn)
+    {
+        acknowledge(m.getDeliveryTag(), txn);
+    }
+
+
+    public void acknowledge(final Binary deliveryTag)
+    {
+        acknowledge(deliveryTag, null, null);
+    }
+
+
+    public void acknowledge(final Binary deliveryTag, SettledAction a)
+    {
+        acknowledge(deliveryTag, null, a);
+    }
+
+    public void acknowledge(final Binary deliveryTag, final Transaction txn)
+    {
+        acknowledge(deliveryTag, txn, null);
+    }
+
+    public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
+    {
+        update(new Accepted(), deliveryTag, txn, action);
+    }
+
+    public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
+    {
+
+        DeliveryState state;
+        if(txn != null)
+        {
+            TransactionalState txnState = new TransactionalState();
+            txnState.setOutcome(outcome);
+            txnState.setTxnId(txn.getTxnId());
+            state = txnState;
+        }
+        else
+        {
+            state = (DeliveryState) outcome;
+        }
+        boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+        if(!(settled || action == null))
+        {
+            _unsettledMap.put(deliveryTag, action);
+        }
+
+        _endpoint.updateDisposition(deliveryTag,state, settled);
+    }
+
+    public Error getError()
+    {
+        return _error;
+    }
+
+    public void acknowledgeAll(Message m)
+    {
+        acknowledgeAll(m.getDeliveryTag());
+    }
+
+    public void acknowledgeAll(Binary deliveryTag)
+    {
+        acknowledgeAll(deliveryTag, null, null);
+    }
+
+    public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
+    {
+        updateAll(new Accepted(), deliveryTag, txn, action);
+    }
+
+    public void updateAll(Outcome outcome, Binary deliveryTag)
+    {
+        updateAll(outcome, deliveryTag, null, null);
+    }
+
+    public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
+    {
+        DeliveryState state;
+
+        if(txn != null)
+        {
+            TransactionalState txnState = new TransactionalState();
+            txnState.setOutcome(outcome);
+            txnState.setTxnId(txn.getTxnId());
+            state = txnState;
+        }
+        else
+        {
+            state = (DeliveryState) outcome;
+        }
+        boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+        if(!(settled || action == null))
+        {
+            _unsettledMap.put(deliveryTag, action);
+        }
+        _endpoint.updateAllDisposition(deliveryTag, state, settled);
+    }
+
+
+
+    public void close()
+    {
+        _endpoint.setTarget(null);
+        _endpoint.close();
+        Message msg;
+        while((msg = receive(-1l)) != null)
+        {
+            release(msg);
+        }
+
+    }
+
+
+    public void detach()
+    {
+        _endpoint.setTarget(null);
+        _endpoint.detach();
+        Message msg;
+        while((msg = receive(-1l)) != null)
+        {
+            release(msg);
+        }
+
+    }
+
+    public void drain()
+    {
+        _endpoint.drain();
+    }
+
+    /**
+     * Waits for the receiver to drain or a message to be available to be received.
+     * @return true if the receiver has been drained.
+     */
+    public boolean drainWait()
+    {
+        final Object lock = _endpoint.getLock();
+        synchronized(lock)
+        {
+            try
+            {
+                while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+                {
+                    lock.wait();
+                }
+            }
+            catch (InterruptedException e)
+            {
+            }
+        }
+        return _prefetchQueue.peek()==null && _endpoint.isDrained();
+    }
+
+    /**
+     * Clears the receiver drain so that message delivery can resume.
+     */
+    public void clearDrain()
+    {
+        _endpoint.clearDrain();
+    }
+
+    public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
+    {
+        _endpoint.setLinkCredit(credit);
+        _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
+        _endpoint.setCreditWindow(false);
+
+    }
+
+    public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+    {
+        if(Boolean.TRUE.equals(settled))
+        {
+            SettledAction action = _unsettledMap.remove(deliveryTag);
+            if(action != null)
+            {
+                action.onSettled(deliveryTag);
+            }
+        }
+    }
+
+    public Map<Binary, Outcome> getRemoteUnsettled()
+    {
+        return _endpoint.getInitialUnsettledMap();
+    }
+
+
+    public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
+    {
+        synchronized(_endpoint.getLock())
+        {
+            _messageArrivalListener = messageArrivalListener;
+            int prefetchSize = _prefetchQueue.size();
+            for(int i = 0; i < prefetchSize; i++)
+            {
+                postPrefetchAction();
+            }
+        }
+    }
+
+    public Session getSession()
+    {
+        return _session;
+    }
+
+    public org.apache.qpid.amqp_1_0.type.Source getSource()
+    {
+        return _endpoint.getSource();
+    }
+
+    public static interface SettledAction
+    {
+        public void onSettled(Binary deliveryTag);
+    }
+
+
+    public interface MessageArrivalListener
+    {
+        void messageArrived(Receiver receiver);
+    }
+
+    public void setRemoteErrorListener(Runnable listener)
+    {
+        _remoteErrorTask = listener;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org