You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 17:15:18 UTC
svn commit: r1526202 [2/3] - in /qpid/trunk/qpid/java/amqp-1-0-client:
example/src/main/java/org/apache/qpid/amqp_1_0/client/
src/main/java/org/apache/qpid/amqp_1_0/client/
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Wed Sep 25 15:15:18 2013
@@ -1,412 +1,412 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.net.ssl.SSLSocketFactory;
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-
-public class Connection
-{
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
- private static final int MAX_FRAME_SIZE = 65536;
-
- private String _address;
- private ConnectionEndpoint _conn;
- private int _sessionCount;
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password, String remoteHostname) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,new Container());
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,MAX_FRAME_SIZE,container);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container, null);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
- }
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname, boolean ssl) throws ConnectionException
- {
-
- _address = address;
-
- try
- {
- final Socket s;
- if(ssl)
- {
- s = SSLSocketFactory.getDefault().createSocket(address, port);
- }
- else
- {
- s = new Socket(address, port);
- }
-
-
- Principal principal = username == null ? null : new Principal()
- {
-
- public String getName()
- {
- return username;
- }
- };
- _conn = new ConnectionEndpoint(container, principal, password);
- _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
- _conn.setRemoteAddress(s.getRemoteSocketAddress());
- _conn.setRemoteHostname(remoteHostname);
-
-
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
-
- final OutputStream outputStream = s.getOutputStream();
-
- ConnectionHandler.BytesSource src;
-
- if(_conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
-
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
-
- _conn.setSaslFrameOutput(saslOut);
- }
- else
- {
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
- }
-
-
- ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
- Thread outputThread = new Thread(outputHandler);
- outputThread.setDaemon(true);
- outputThread.start();
- _conn.setFrameOutputHandler(out);
-
-
-
- final ConnectionHandler handler = new ConnectionHandler(_conn);
- final InputStream inputStream = s.getInputStream();
-
- Thread inputThread = new Thread(new Runnable()
- {
-
- public void run()
- {
- try
- {
- doRead(handler, inputStream);
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- s.close();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-
- inputThread.setDaemon(true);
- inputThread.start();
-
- _conn.open();
-
- }
- catch (IOException e)
- {
- throw new ConnectionException(e);
- }
-
-
- }
-
- private Connection(ConnectionEndpoint endpoint)
- {
- _conn = endpoint;
- }
-
-
- private void doRead(final AMQPTransport transport, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
- ByteBuffer bbuf = ByteBuffer.wrap(buf);
- final Object lock = new Object();
- transport.setInputStateChangeListener(new StateChangeListener(){
-
- public void onStateChange(final boolean active)
- {
- synchronized(lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- try
- {
- int read;
- while((read = inputStream.read(buf)) != -1)
- {
- bbuf.position(0);
- bbuf.limit(read);
-
- while(bbuf.hasRemaining() && transport.isOpenForInput())
- {
- transport.processBytes(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- }
-
- public Session createSession() throws ConnectionException
- {
- checkNotClosed();
- Session session = new Session(this,String.valueOf(_sessionCount++));
- return session;
- }
-
- void checkNotClosed() throws ConnectionClosedException
- {
- if(getEndpoint().isClosed())
- {
- throw new ConnectionClosedException(getEndpoint().getRemoteError());
- }
- }
-
- public ConnectionEndpoint getEndpoint()
- {
- return _conn;
- }
-
- public void awaitOpen()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
- private void doRead(final ConnectionHandler handler, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
-
-
- try
- {
- int read;
- boolean done = false;
- while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
- {
- ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
- Binary b = new Binary(buf,0,read);
-
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
- }
- while(bbuf.hasRemaining() && !handler.isDone())
- {
- handler.parse(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- public void close()
- {
- _conn.close();
-
- synchronized (_conn.getLock())
- {
- while(!_conn.closedForInput())
- {
- try
- {
- _conn.getLock().wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+public class Connection
+{
+ private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+ private static final int MAX_FRAME_SIZE = 65536;
+
+ private String _address;
+ private ConnectionEndpoint _conn;
+ private int _sessionCount;
+
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password, String remoteHostname) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,new Container());
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container) throws ConnectionException
+ {
+ this(address,port,username,password,MAX_FRAME_SIZE,container);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,container, null);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final String remoteHost,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final String remoteHost,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
+ }
+
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname, boolean ssl) throws ConnectionException
+ {
+
+ _address = address;
+
+ try
+ {
+ final Socket s;
+ if(ssl)
+ {
+ s = SSLSocketFactory.getDefault().createSocket(address, port);
+ }
+ else
+ {
+ s = new Socket(address, port);
+ }
+
+
+ Principal principal = username == null ? null : new Principal()
+ {
+
+ public String getName()
+ {
+ return username;
+ }
+ };
+ _conn = new ConnectionEndpoint(container, principal, password);
+ _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+ _conn.setRemoteAddress(s.getRemoteSocketAddress());
+ _conn.setRemoteHostname(remoteHostname);
+
+
+
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
+
+
+ final OutputStream outputStream = s.getOutputStream();
+
+ ConnectionHandler.BytesSource src;
+
+ if(_conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
+
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+
+ _conn.setSaslFrameOutput(saslOut);
+ }
+ else
+ {
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+ }
+
+
+ ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
+ Thread outputThread = new Thread(outputHandler);
+ outputThread.setDaemon(true);
+ outputThread.start();
+ _conn.setFrameOutputHandler(out);
+
+
+
+ final ConnectionHandler handler = new ConnectionHandler(_conn);
+ final InputStream inputStream = s.getInputStream();
+
+ Thread inputThread = new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ doRead(handler, inputStream);
+ }
+ finally
+ {
+ if(_conn.closedForInput() && _conn.closedForOutput())
+ {
+ try
+ {
+ s.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+ });
+
+ inputThread.setDaemon(true);
+ inputThread.start();
+
+ _conn.open();
+
+ }
+ catch (IOException e)
+ {
+ throw new ConnectionException(e);
+ }
+
+
+ }
+
+ private Connection(ConnectionEndpoint endpoint)
+ {
+ _conn = endpoint;
+ }
+
+
+ private void doRead(final AMQPTransport transport, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+ ByteBuffer bbuf = ByteBuffer.wrap(buf);
+ final Object lock = new Object();
+ transport.setInputStateChangeListener(new StateChangeListener(){
+
+ public void onStateChange(final boolean active)
+ {
+ synchronized(lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ });
+
+ try
+ {
+ int read;
+ while((read = inputStream.read(buf)) != -1)
+ {
+ bbuf.position(0);
+ bbuf.limit(read);
+
+ while(bbuf.hasRemaining() && transport.isOpenForInput())
+ {
+ transport.processBytes(bbuf);
+ }
+
+
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public Session createSession() throws ConnectionException
+ {
+ checkNotClosed();
+ Session session = new Session(this,String.valueOf(_sessionCount++));
+ return session;
+ }
+
+ void checkNotClosed() throws ConnectionClosedException
+ {
+ if(getEndpoint().isClosed())
+ {
+ throw new ConnectionClosedException(getEndpoint().getRemoteError());
+ }
+ }
+
+ public ConnectionEndpoint getEndpoint()
+ {
+ return _conn;
+ }
+
+ public void awaitOpen()
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
+ {
+ try
+ {
+ getEndpoint().getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+
+ private void doRead(final ConnectionHandler handler, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+
+
+ try
+ {
+ int read;
+ boolean done = false;
+ while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
+ {
+ ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
+ Binary b = new Binary(buf,0,read);
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
+ }
+ while(bbuf.hasRemaining() && !handler.isDone())
+ {
+ handler.parse(bbuf);
+ }
+
+
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void close()
+ {
+ _conn.close();
+
+ synchronized (_conn.getLock())
+ {
+ while(!_conn.closedForInput())
+ {
+ try
+ {
+ _conn.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java Wed Sep 25 15:15:18 2013
@@ -1,148 +1,148 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-public class Message
-{
- private Binary _deliveryTag;
- private List<Section> _payload = new ArrayList<Section>();
- private Boolean _resume;
- private boolean _settled;
- private DeliveryState _deliveryState;
- private Receiver _receiver;
-
-
- public Message()
- {
- }
-
- public Message(Collection<Section> sections)
- {
- _payload.addAll(sections);
- }
-
- public Message(Section section)
- {
- this(Collections.singletonList(section));
- }
-
- public Message(String message)
- {
- this(new AmqpValue(message));
- }
-
-
- public Binary getDeliveryTag()
- {
- return _deliveryTag;
- }
-
- public void setDeliveryTag(Binary deliveryTag)
- {
- _deliveryTag = deliveryTag;
- }
-
- public List<Section> getPayload()
- {
- return Collections.unmodifiableList(_payload);
- }
-
- private <T extends Section> T getSection(Class<T> clazz)
- {
- for(Section s : _payload)
- {
- if(clazz.isAssignableFrom(s.getClass()))
- {
- return (T) s;
- }
- }
- return null;
- }
-
- public ApplicationProperties getApplicationProperties()
- {
- return getSection(ApplicationProperties.class);
- }
-
- public Properties getProperties()
- {
- return getSection(Properties.class);
- }
-
- public Header getHeader()
- {
- return getSection(Header.class);
- }
-
-
- public void setResume(final Boolean resume)
- {
- _resume = resume;
- }
-
- public boolean isResume()
- {
- return Boolean.TRUE.equals(_resume);
- }
-
- public void setDeliveryState(DeliveryState state)
- {
- _deliveryState = state;
- }
-
- public DeliveryState getDeliveryState()
- {
- return _deliveryState;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
-
- public boolean getSettled()
- {
- return _settled;
- }
-
- public void setReceiver(final Receiver receiver)
- {
- _receiver = receiver;
- }
-
- public Receiver getReceiver()
- {
- return _receiver;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class Message
+{
+ private Binary _deliveryTag;
+ private List<Section> _payload = new ArrayList<Section>();
+ private Boolean _resume;
+ private boolean _settled;
+ private DeliveryState _deliveryState;
+ private Receiver _receiver;
+
+
+ public Message()
+ {
+ }
+
+ public Message(Collection<Section> sections)
+ {
+ _payload.addAll(sections);
+ }
+
+ public Message(Section section)
+ {
+ this(Collections.singletonList(section));
+ }
+
+ public Message(String message)
+ {
+ this(new AmqpValue(message));
+ }
+
+
+ public Binary getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ public void setDeliveryTag(Binary deliveryTag)
+ {
+ _deliveryTag = deliveryTag;
+ }
+
+ public List<Section> getPayload()
+ {
+ return Collections.unmodifiableList(_payload);
+ }
+
+ private <T extends Section> T getSection(Class<T> clazz)
+ {
+ for(Section s : _payload)
+ {
+ if(clazz.isAssignableFrom(s.getClass()))
+ {
+ return (T) s;
+ }
+ }
+ return null;
+ }
+
+ public ApplicationProperties getApplicationProperties()
+ {
+ return getSection(ApplicationProperties.class);
+ }
+
+ public Properties getProperties()
+ {
+ return getSection(Properties.class);
+ }
+
+ public Header getHeader()
+ {
+ return getSection(Header.class);
+ }
+
+
+ public void setResume(final Boolean resume)
+ {
+ _resume = resume;
+ }
+
+ public boolean isResume()
+ {
+ return Boolean.TRUE.equals(_resume);
+ }
+
+ public void setDeliveryState(DeliveryState state)
+ {
+ _deliveryState = state;
+ }
+
+ public DeliveryState getDeliveryState()
+ {
+ return _deliveryState;
+ }
+
+ public void setSettled(boolean settled)
+ {
+ _settled = settled;
+ }
+
+ public boolean getSettled()
+ {
+ return _settled;
+ }
+
+ public void setReceiver(final Receiver receiver)
+ {
+ _receiver = receiver;
+ }
+
+ public Receiver getReceiver()
+ {
+ return _receiver;
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1526202&r1=1526201&r2=1526202&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Wed Sep 25 15:15:18 2013
@@ -1,615 +1,615 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class Receiver implements DeliveryStateHandler
-{
- private ReceivingLinkEndpoint _endpoint;
- private int _id;
- private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
- private Session _session;
-
- private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
- private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
- private MessageArrivalListener _messageArrivalListener;
- private org.apache.qpid.amqp_1_0.type.transport.Error _error;
- private Runnable _remoteErrorTask;
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode) throws ConnectionErrorException
- {
- this(session, linkName, target, source, ackMode, false);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- boolean isDurable) throws ConnectionErrorException
- {
- this(session,linkName,target,source,ackMode,isDurable,null);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- final boolean isDurable,
- final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
- {
-
- session.getConnection().checkNotClosed();
- _session = session;
- if(isDurable)
- {
- source.setDurable(TerminusDurability.UNSETTLED_STATE);
- source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- else if(source != null)
- {
- source.setDurable(TerminusDurability.NONE);
- source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- }
- _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
- UnsignedInteger.ZERO);
-
- _endpoint.setDeliveryStateHandler(this);
-
- switch(ackMode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
-
- _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
- {
- @Override public void messageTransfer(final Transfer xfr)
- {
- _prefetchQueue.add(xfr);
- postPrefetchAction();
- }
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(detach.getError()!=null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
-
- _endpoint.setLocalUnsettled(unsettled);
- _endpoint.attach();
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isAttached() && !_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- if(_endpoint.getSource() == null)
- {
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- throw new ConnectionErrorException(getError());
- }
- else
- {
-
- }
- }
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
- private void postPrefetchAction()
- {
- if(_messageArrivalListener != null)
- {
- _messageArrivalListener.messageArrived(this);
- }
- }
-
- public void setCredit(UnsignedInteger credit, boolean window)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setCreditWindow(window);
-
- }
-
-
- public String getAddress()
- {
- return ((Source)_endpoint.getSource()).getAddress();
- }
-
- public Map getFilter()
- {
- return ((Source)_endpoint.getSource()).getFilter();
- }
-
- public Message receive()
- {
- return receive(-1L);
- }
-
- public Message receive(boolean wait)
- {
- return receive(wait ? -1L : 0L);
- }
-
- // 0 means no wait, -1 wait forever
- public Message receive(long wait)
- {
- Message m = null;
- Transfer xfr;
- long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
-
- while((xfr = receiveFromPrefetch(wait)) != null )
- {
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- Binary deliveryTag = xfr.getDeliveryTag();
- Boolean resume = xfr.getResume();
-
- List<Section> sections = new ArrayList<Section>();
- List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
- int totalSize = 0;
-
- boolean hasMore;
- do
- {
- hasMore = Boolean.TRUE.equals(xfr.getMore());
-
- ByteBuffer buf = xfr.getPayload();
-
- if(buf != null)
- {
-
- totalSize += buf.remaining();
-
- payloads.add(buf);
- }
- if(hasMore)
- {
- xfr = receiveFromPrefetch(-1l);
- if(xfr== null)
- {
- // TODO - this is wrong!!!!
- System.out.println("eeek");
- }
- }
- }
- while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
- for(ByteBuffer payload : payloads)
- {
- allPayload.put(payload);
- }
- allPayload.flip();
- SectionDecoder decoder = _session.getSectionDecoder();
-
- try
- {
- sections = decoder.parseAll(allPayload);
- }
- catch (AmqpErrorException e)
- {
- // todo - throw a sensible error
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- m = new Message(sections);
- m.setDeliveryTag(deliveryTag);
- m.setResume(resume);
- m.setReceiver(this);
- break;
- }
- }
-
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <=0L)
- {
- break;
- }
- }
- }
-
-
- return m;
-
- }
-
- private Transfer receiveFromPrefetch(long wait)
- {
- long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- Transfer xfr;
- while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
- && wait != 0)
- {
- try
- {
- if(wait>0L)
- {
- lock.wait(wait);
- }
- else if(wait<0L)
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- return null;
- }
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <= 0L)
- {
- break;
- }
- }
-
- }
- if(xfr != null)
- {
- _prefetchQueue.poll();
-
- }
-
- return xfr;
- }
-
- }
-
-
- public void release(final Message m)
- {
- release(m.getDeliveryTag());
- }
-
- public void release(Binary deliveryTag)
- {
- update(new Released(), deliveryTag, null, null);
- }
-
-
- public void modified(Binary tag)
- {
- final Modified outcome = new Modified();
- outcome.setDeliveryFailed(true);
-
- update(outcome, tag, null, null);
- }
-
- public void acknowledge(final Message m)
- {
- acknowledge(m.getDeliveryTag());
- }
-
- public void acknowledge(final Message m, SettledAction a)
- {
- acknowledge(m.getDeliveryTag(), a);
- }
-
-
- public void acknowledge(final Message m, Transaction txn)
- {
- acknowledge(m.getDeliveryTag(), txn);
- }
-
-
- public void acknowledge(final Binary deliveryTag)
- {
- acknowledge(deliveryTag, null, null);
- }
-
-
- public void acknowledge(final Binary deliveryTag, SettledAction a)
- {
- acknowledge(deliveryTag, null, a);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn)
- {
- acknowledge(deliveryTag, txn, null);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- update(new Accepted(), deliveryTag, txn, action);
- }
-
- public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
-
- DeliveryState state;
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
-
- _endpoint.updateDisposition(deliveryTag,state, settled);
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public void acknowledgeAll(Message m)
- {
- acknowledgeAll(m.getDeliveryTag());
- }
-
- public void acknowledgeAll(Binary deliveryTag)
- {
- acknowledgeAll(deliveryTag, null, null);
- }
-
- public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- updateAll(new Accepted(), deliveryTag, txn, action);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag)
- {
- updateAll(outcome, deliveryTag, null, null);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- DeliveryState state;
-
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
- _endpoint.updateAllDisposition(deliveryTag, state, settled);
- }
-
-
-
- public void close()
- {
- _endpoint.setTarget(null);
- _endpoint.close();
- Message msg;
- while((msg = receive(-1l)) != null)
- {
- release(msg);
- }
-
- }
-
-
- public void detach()
- {
- _endpoint.setTarget(null);
- _endpoint.detach();
- Message msg;
- while((msg = receive(-1l)) != null)
- {
- release(msg);
- }
-
- }
-
- public void drain()
- {
- _endpoint.drain();
- }
-
- /**
- * Waits for the receiver to drain or a message to be available to be received.
- * @return true if the receiver has been drained.
- */
- public boolean drainWait()
- {
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- try
- {
- while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- }
- }
- return _prefetchQueue.peek()==null && _endpoint.isDrained();
- }
-
- /**
- * Clears the receiver drain so that message delivery can resume.
- */
- public void clearDrain()
- {
- _endpoint.clearDrain();
- }
-
- public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
- _endpoint.setCreditWindow(false);
-
- }
-
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
- if(Boolean.TRUE.equals(settled))
- {
- SettledAction action = _unsettledMap.remove(deliveryTag);
- if(action != null)
- {
- action.onSettled(deliveryTag);
- }
- }
- }
-
- public Map<Binary, Outcome> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
-
- public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
- {
- synchronized(_endpoint.getLock())
- {
- _messageArrivalListener = messageArrivalListener;
- int prefetchSize = _prefetchQueue.size();
- for(int i = 0; i < prefetchSize; i++)
- {
- postPrefetchAction();
- }
- }
- }
-
- public Session getSession()
- {
- return _session;
- }
-
- public org.apache.qpid.amqp_1_0.type.Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public static interface SettledAction
- {
- public void onSettled(Binary deliveryTag);
- }
-
-
- public interface MessageArrivalListener
- {
- void messageArrived(Receiver receiver);
- }
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Receiver implements DeliveryStateHandler
+{
+ private ReceivingLinkEndpoint _endpoint;
+ private int _id;
+ private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
+ private Session _session;
+
+ private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
+ private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+ private MessageArrivalListener _messageArrivalListener;
+ private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+ private Runnable _remoteErrorTask;
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode) throws ConnectionErrorException
+ {
+ this(session, linkName, target, source, ackMode, false);
+ }
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode,
+ boolean isDurable) throws ConnectionErrorException
+ {
+ this(session,linkName,target,source,ackMode,isDurable,null);
+ }
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode,
+ final boolean isDurable,
+ final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
+ {
+
+ session.getConnection().checkNotClosed();
+ _session = session;
+ if(isDurable)
+ {
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ else if(source != null)
+ {
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ }
+ _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
+ UnsignedInteger.ZERO);
+
+ _endpoint.setDeliveryStateHandler(this);
+
+ switch(ackMode)
+ {
+ case ALO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case EO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+
+ _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
+ {
+ @Override public void messageTransfer(final Transfer xfr)
+ {
+ _prefetchQueue.add(xfr);
+ postPrefetchAction();
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ if(detach.getError()!=null)
+ {
+ remoteError();
+ }
+ super.remoteDetached(endpoint, detach);
+ }
+ });
+
+ _endpoint.setLocalUnsettled(unsettled);
+ _endpoint.attach();
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isAttached() && !_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ if(_endpoint.getSource() == null)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ throw new ConnectionErrorException(getError());
+ }
+ else
+ {
+
+ }
+ }
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+ private void postPrefetchAction()
+ {
+ if(_messageArrivalListener != null)
+ {
+ _messageArrivalListener.messageArrived(this);
+ }
+ }
+
+ public void setCredit(UnsignedInteger credit, boolean window)
+ {
+ _endpoint.setLinkCredit(credit);
+ _endpoint.setCreditWindow(window);
+
+ }
+
+
+ public String getAddress()
+ {
+ return ((Source)_endpoint.getSource()).getAddress();
+ }
+
+ public Map getFilter()
+ {
+ return ((Source)_endpoint.getSource()).getFilter();
+ }
+
+ public Message receive()
+ {
+ return receive(-1L);
+ }
+
+ public Message receive(boolean wait)
+ {
+ return receive(wait ? -1L : 0L);
+ }
+
+ // 0 means no wait, -1 wait forever
+ public Message receive(long wait)
+ {
+ Message m = null;
+ Transfer xfr;
+ long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
+
+ while((xfr = receiveFromPrefetch(wait)) != null )
+ {
+
+ if(!Boolean.TRUE.equals(xfr.getAborted()))
+ {
+ Binary deliveryTag = xfr.getDeliveryTag();
+ Boolean resume = xfr.getResume();
+
+ List<Section> sections = new ArrayList<Section>();
+ List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
+ int totalSize = 0;
+
+ boolean hasMore;
+ do
+ {
+ hasMore = Boolean.TRUE.equals(xfr.getMore());
+
+ ByteBuffer buf = xfr.getPayload();
+
+ if(buf != null)
+ {
+
+ totalSize += buf.remaining();
+
+ payloads.add(buf);
+ }
+ if(hasMore)
+ {
+ xfr = receiveFromPrefetch(-1l);
+ if(xfr== null)
+ {
+ // TODO - this is wrong!!!!
+ System.out.println("eeek");
+ }
+ }
+ }
+ while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
+
+ if(!Boolean.TRUE.equals(xfr.getAborted()))
+ {
+ ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
+ for(ByteBuffer payload : payloads)
+ {
+ allPayload.put(payload);
+ }
+ allPayload.flip();
+ SectionDecoder decoder = _session.getSectionDecoder();
+
+ try
+ {
+ sections = decoder.parseAll(allPayload);
+ }
+ catch (AmqpErrorException e)
+ {
+ // todo - throw a sensible error
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ m = new Message(sections);
+ m.setDeliveryTag(deliveryTag);
+ m.setResume(resume);
+ m.setReceiver(this);
+ break;
+ }
+ }
+
+ if(wait > 0L)
+ {
+ wait = endTime - System.currentTimeMillis();
+ if(wait <=0L)
+ {
+ break;
+ }
+ }
+ }
+
+
+ return m;
+
+ }
+
+ private Transfer receiveFromPrefetch(long wait)
+ {
+ long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ Transfer xfr;
+ while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
+ && wait != 0)
+ {
+ try
+ {
+ if(wait>0L)
+ {
+ lock.wait(wait);
+ }
+ else if(wait<0L)
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ if(wait > 0L)
+ {
+ wait = endTime - System.currentTimeMillis();
+ if(wait <= 0L)
+ {
+ break;
+ }
+ }
+
+ }
+ if(xfr != null)
+ {
+ _prefetchQueue.poll();
+
+ }
+
+ return xfr;
+ }
+
+ }
+
+
+ public void release(final Message m)
+ {
+ release(m.getDeliveryTag());
+ }
+
+ public void release(Binary deliveryTag)
+ {
+ update(new Released(), deliveryTag, null, null);
+ }
+
+
+ public void modified(Binary tag)
+ {
+ final Modified outcome = new Modified();
+ outcome.setDeliveryFailed(true);
+
+ update(outcome, tag, null, null);
+ }
+
+ public void acknowledge(final Message m)
+ {
+ acknowledge(m.getDeliveryTag());
+ }
+
+ public void acknowledge(final Message m, SettledAction a)
+ {
+ acknowledge(m.getDeliveryTag(), a);
+ }
+
+
+ public void acknowledge(final Message m, Transaction txn)
+ {
+ acknowledge(m.getDeliveryTag(), txn);
+ }
+
+
+ public void acknowledge(final Binary deliveryTag)
+ {
+ acknowledge(deliveryTag, null, null);
+ }
+
+
+ public void acknowledge(final Binary deliveryTag, SettledAction a)
+ {
+ acknowledge(deliveryTag, null, a);
+ }
+
+ public void acknowledge(final Binary deliveryTag, final Transaction txn)
+ {
+ acknowledge(deliveryTag, txn, null);
+ }
+
+ public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ update(new Accepted(), deliveryTag, txn, action);
+ }
+
+ public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+
+ DeliveryState state;
+ if(txn != null)
+ {
+ TransactionalState txnState = new TransactionalState();
+ txnState.setOutcome(outcome);
+ txnState.setTxnId(txn.getTxnId());
+ state = txnState;
+ }
+ else
+ {
+ state = (DeliveryState) outcome;
+ }
+ boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+ if(!(settled || action == null))
+ {
+ _unsettledMap.put(deliveryTag, action);
+ }
+
+ _endpoint.updateDisposition(deliveryTag,state, settled);
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
+ public void acknowledgeAll(Message m)
+ {
+ acknowledgeAll(m.getDeliveryTag());
+ }
+
+ public void acknowledgeAll(Binary deliveryTag)
+ {
+ acknowledgeAll(deliveryTag, null, null);
+ }
+
+ public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ updateAll(new Accepted(), deliveryTag, txn, action);
+ }
+
+ public void updateAll(Outcome outcome, Binary deliveryTag)
+ {
+ updateAll(outcome, deliveryTag, null, null);
+ }
+
+ public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ DeliveryState state;
+
+ if(txn != null)
+ {
+ TransactionalState txnState = new TransactionalState();
+ txnState.setOutcome(outcome);
+ txnState.setTxnId(txn.getTxnId());
+ state = txnState;
+ }
+ else
+ {
+ state = (DeliveryState) outcome;
+ }
+ boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+ if(!(settled || action == null))
+ {
+ _unsettledMap.put(deliveryTag, action);
+ }
+ _endpoint.updateAllDisposition(deliveryTag, state, settled);
+ }
+
+
+
+ public void close()
+ {
+ _endpoint.setTarget(null);
+ _endpoint.close();
+ Message msg;
+ while((msg = receive(-1l)) != null)
+ {
+ release(msg);
+ }
+
+ }
+
+
+ public void detach()
+ {
+ _endpoint.setTarget(null);
+ _endpoint.detach();
+ Message msg;
+ while((msg = receive(-1l)) != null)
+ {
+ release(msg);
+ }
+
+ }
+
+ public void drain()
+ {
+ _endpoint.drain();
+ }
+
+ /**
+ * Waits for the receiver to drain or a message to be available to be received.
+ * @return true if the receiver has been drained.
+ */
+ public boolean drainWait()
+ {
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ try
+ {
+ while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return _prefetchQueue.peek()==null && _endpoint.isDrained();
+ }
+
+ /**
+ * Clears the receiver drain so that message delivery can resume.
+ */
+ public void clearDrain()
+ {
+ _endpoint.clearDrain();
+ }
+
+ public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
+ {
+ _endpoint.setLinkCredit(credit);
+ _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
+ _endpoint.setCreditWindow(false);
+
+ }
+
+ public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ SettledAction action = _unsettledMap.remove(deliveryTag);
+ if(action != null)
+ {
+ action.onSettled(deliveryTag);
+ }
+ }
+ }
+
+ public Map<Binary, Outcome> getRemoteUnsettled()
+ {
+ return _endpoint.getInitialUnsettledMap();
+ }
+
+
+ public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ _messageArrivalListener = messageArrivalListener;
+ int prefetchSize = _prefetchQueue.size();
+ for(int i = 0; i < prefetchSize; i++)
+ {
+ postPrefetchAction();
+ }
+ }
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public org.apache.qpid.amqp_1_0.type.Source getSource()
+ {
+ return _endpoint.getSource();
+ }
+
+ public static interface SettledAction
+ {
+ public void onSettled(Binary deliveryTag);
+ }
+
+
+ public interface MessageArrivalListener
+ {
+ void messageArrived(Receiver receiver);
+ }
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org