You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 00:05:07 UTC

svn commit: r1534394 [17/22] - in /qpid/branches/linearstore/qpid: ./ cpp/ cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/ cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/ cpp/examples/qmf-agent/ cpp/include/qpid/ c...

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java Mon Oct 21 22:04:51 2013
@@ -1,452 +1,497 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Sender implements DeliveryStateHandler
-{
-    private SendingLinkEndpoint _endpoint;
-    private int _id;
-    private Session _session;
-    private int _windowSize;
-    private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
-    private boolean _closed;
-    private Error _error;
-    private Runnable _remoteErrorTask;
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, false);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  boolean synchronous)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window) throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
-    }
-
-
-    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
-                  int window) throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, target, source, window, AcknowledgeMode.ALO);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window, AcknowledgeMode mode)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, window, mode, null);
-    }
-
-    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
-                  int window, AcknowledgeMode mode)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, target, source, window, mode, null);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
-    }
-
-    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
-                  int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
-            throws SenderCreationException, ConnectionClosedException
-    {
-        this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
-    }
-
-    private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
-    {
-        org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
-        source.setAddress(sourceAddr);
-        return source;
-    }
-
-    private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
-    {
-        org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
-        target.setAddress(targetAddr);
-        if(isDurable)
-        {
-            target.setDurable(TerminusDurability.UNSETTLED_STATE);
-            target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-        }
-        return target;
-    }
-
-    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
-                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
-            throws SenderCreationException, ConnectionClosedException
-    {
-
-        _session = session;
-        session.getConnection().checkNotClosed();
-        _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
-                                                                    source, target, unsettled);
-
-
-        switch(mode)
-        {
-            case ALO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-                break;
-            case AMO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
-                break;
-            case EO:
-                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
-                break;
-
-        }
-        _endpoint.setDeliveryStateHandler(this);
-        _endpoint.attach();
-        _windowSize = window;
-
-        synchronized(_endpoint.getLock())
-        {
-            while(!(_endpoint.isAttached() || _endpoint.isDetached()))
-            {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new SenderCreationException(e);
-                }
-            }
-            if(_endpoint.getTarget()== null)
-            {
-                throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
-            };
-        }
-
-        _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
-        {
-
-            @Override
-            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
-            {
-                _error = detach.getError();
-                if(_error != null)
-                {
-                    remoteError();
-                }
-                super.remoteDetached(endpoint, detach);
-            }
-        });
-    }
-
-    public Source getSource()
-    {
-        return _endpoint.getSource();
-    }
-
-    public Target getTarget()
-    {
-        return _endpoint.getTarget();
-    }
-
-    public void send(Message message) throws LinkDetachedException
-    {
-        send(message, null, null);
-    }
-
-    public void send(Message message, final OutcomeAction action) throws LinkDetachedException
-    {
-        send(message, null, action);
-    }
-
-    public void send(Message message, final Transaction txn) throws LinkDetachedException
-    {
-        send(message, txn, null);
-    }
-
-    public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
-    {
-
-        List<Section> sections = message.getPayload();
-
-        Transfer xfr = new Transfer();
-
-        if(sections != null && !sections.isEmpty())
-        {
-            SectionEncoder encoder = _session.getSectionEncoder();
-            encoder.reset();
-
-            int sectionNumber = 0;
-            for(Section section : sections)
-            {
-                encoder.encodeObject(section);
-            }
-
-
-            Binary encoding = encoder.getEncoding();
-            ByteBuffer payload = encoding.asByteBuffer();
-            xfr.setPayload(payload);
-        }
-        if(message.getDeliveryTag() == null)
-        {
-            message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
-        }
-        if(message.isResume())
-        {
-            xfr.setResume(Boolean.TRUE);
-        }
-        if(message.getDeliveryState() != null)
-        {
-            xfr.setState(message.getDeliveryState());
-        }
-
-        xfr.setDeliveryTag(message.getDeliveryTag());
-        //xfr.setSettled(_windowSize ==0);
-        if(txn != null)
-        {
-            xfr.setSettled(false);
-            TransactionalState deliveryState = new TransactionalState();
-            deliveryState.setTxnId(txn.getTxnId());
-            xfr.setState(deliveryState);
-        }
-        else
-        {
-            xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
-        }
-        final Object lock = _endpoint.getLock();
-        synchronized(lock)
-        {
-            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
-            {
-                try
-                {
-                    lock.wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-            if(_endpoint.isDetached())
-            {
-                throw new LinkDetachedException(_error);
-            }
-            if(action != null)
-            {
-                _outcomeActions.put(message.getDeliveryTag(), action);
-            }
-            _endpoint.transfer(xfr);
-            //TODO - rationalise sending of flows
-            // _endpoint.sendFlow();
-        }
-
-        if(_windowSize != 0)
-        {
-            synchronized(lock)
-            {
-
-
-                while(_endpoint.getUnsettledCount() >= _windowSize)
-                {
-                    try
-                    {
-                        lock.wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
-            }
-
-        }
-
-
-    }
-
-    public void close() throws SenderClosingException
-    {
-
-        if(_windowSize != 0)
-        {
-            synchronized(_endpoint.getLock())
-            {
-
-
-                while(_endpoint.getUnsettledCount() > 0)
-                {
-                    try
-                    {
-                        _endpoint.getLock().wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
-            }
-
-        }
-        _session.removeSender(this);
-        _endpoint.setSource(null);
-        _endpoint.detach();
-        _closed = true;
-
-        synchronized(_endpoint.getLock())
-        {
-            while(!_endpoint.isDetached())
-            {
-                try
-                {
-                    _endpoint.getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new SenderClosingException(e);
-                }
-            }
-        }
-    }
-
-    public boolean isClosed()
-    {
-        return _closed;
-    }
-
-    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
-    {
-        if(state instanceof Outcome)
-        {
-            OutcomeAction action;
-            if((action = _outcomeActions.remove(deliveryTag)) != null)
-            {
-                action.onOutcome(deliveryTag, (Outcome) state);
-            }
-            if(!Boolean.TRUE.equals(settled))
-            {
-                _endpoint.updateDisposition(deliveryTag, state, true);
-            }
-        }
-        else if(state instanceof TransactionalState)
-        {
-            OutcomeAction action;
-
-            if((action = _outcomeActions.remove(deliveryTag)) != null)
-            {
-                action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
-            }
-
-        }
-    }
-
-    public SendingLinkEndpoint getEndpoint()
-    {
-        return _endpoint;
-    }
-
-    public Map<Binary, DeliveryState> getRemoteUnsettled()
-    {
-        return _endpoint.getInitialUnsettledMap();
-    }
-
-    public Session getSession()
-    {
-        return _session;
-    }
-
-
-    private void remoteError()
-    {
-        if(_remoteErrorTask != null)
-        {
-            _remoteErrorTask.run();
-        }
-    }
-
-
-    public void setRemoteErrorListener(Runnable listener)
-    {
-        _remoteErrorTask = listener;
-    }
-
-    public Error getError()
-    {
-        return _error;
-    }
-
-    public class SenderCreationException extends Exception
-    {
-        public SenderCreationException(Throwable e)
-        {
-            super(e);
-        }
-
-        public SenderCreationException(String e)
-        {
-            super(e);
-
-        }
-    }
-
-    public class SenderClosingException extends Exception
-    {
-        public SenderClosingException(Throwable e)
-        {
-            super(e);
-        }
-    }
-
-    public static interface OutcomeAction
-    {
-        public void onOutcome(Binary deliveryTag, Outcome outcome);
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class Sender implements DeliveryStateHandler
+{
+    private SendingLinkEndpoint _endpoint;
+    private int _id;
+    private Session _session;
+    private int _windowSize;
+    private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
+    private boolean _closed;
+    private Error _error;
+    private Runnable _remoteErrorTask;
+    private Outcome _defaultOutcome;
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, false);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  boolean synchronous)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window) throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
+    }
+
+
+    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+                  int window) throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, target, source, window, AcknowledgeMode.ALO);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window, AcknowledgeMode mode)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, window, mode, null);
+    }
+
+    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+                  int window, AcknowledgeMode mode)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, target, source, window, mode, null);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
+    }
+
+    public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+                  int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
+            throws SenderCreationException, ConnectionClosedException
+    {
+        this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
+    }
+
+    protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source)
+    {
+
+    }
+
+    protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target)
+    {
+
+    }
+
+    private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
+    {
+        org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
+        source.setAddress(sourceAddr);
+        return source;
+    }
+
+    private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
+    {
+        org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
+        target.setAddress(targetAddr);
+        if(isDurable)
+        {
+            target.setDurable(TerminusDurability.UNSETTLED_STATE);
+            target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        }
+        return target;
+    }
+
+    public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+                  int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+            throws SenderCreationException, ConnectionClosedException
+    {
+
+        _session = session;
+        session.getConnection().checkNotClosed();
+        configureSource(source);
+        configureTarget(target);
+        _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
+                                                                    source, target, unsettled);
+
+
+        switch(mode)
+        {
+            case ALO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+                break;
+            case AMO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+                break;
+            case EO:
+                _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+                _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+                break;
+
+        }
+        _endpoint.setDeliveryStateHandler(this);
+        _endpoint.attach();
+        _windowSize = window;
+
+        synchronized(_endpoint.getLock())
+        {
+            while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+            {
+                try
+                {
+                    _endpoint.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new SenderCreationException(e);
+                }
+            }
+            if(_endpoint.getTarget()== null)
+            {
+                throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
+            };
+        }
+
+        _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+        {
+
+            @Override
+            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+            {
+                _error = detach.getError();
+                if(_error != null)
+                {
+                    remoteError();
+                }
+                super.remoteDetached(endpoint, detach);
+            }
+        });
+
+        _defaultOutcome = source.getDefaultOutcome();
+        if(_defaultOutcome == null)
+        {
+            if(source.getOutcomes() == null || source.getOutcomes().length == 0)
+            {
+                _defaultOutcome = new Accepted();
+            }
+            else if(source.getOutcomes().length == 1)
+            {
+
+                final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession()
+                        .getConnection()
+                        .getDescribedTypeRegistry();
+
+                DescribedTypeConstructor constructor = describedTypeRegistry
+                        .getConstructor(source.getOutcomes()[0]);
+                if(constructor != null)
+                {
+                    Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST);
+                    if(impliedOutcome instanceof Outcome)
+                    {
+                        _defaultOutcome = (Outcome) impliedOutcome;
+                    }
+                }
+
+            }
+        }
+    }
+
+    public Source getSource()
+    {
+        return _endpoint.getSource();
+    }
+
+    public Target getTarget()
+    {
+        return _endpoint.getTarget();
+    }
+
+    public void send(Message message) throws LinkDetachedException
+    {
+        send(message, null, null);
+    }
+
+    public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+    {
+        send(message, null, action);
+    }
+
+    public void send(Message message, final Transaction txn) throws LinkDetachedException
+    {
+        send(message, txn, null);
+    }
+
+    public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+    {
+
+        List<Section> sections = message.getPayload();
+
+        Transfer xfr = new Transfer();
+
+        if(sections != null && !sections.isEmpty())
+        {
+            SectionEncoder encoder = _session.getSectionEncoder();
+            encoder.reset();
+
+            int sectionNumber = 0;
+            for(Section section : sections)
+            {
+                encoder.encodeObject(section);
+            }
+
+
+            Binary encoding = encoder.getEncoding();
+            ByteBuffer payload = encoding.asByteBuffer();
+            xfr.setPayload(payload);
+        }
+        if(message.getDeliveryTag() == null)
+        {
+            message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
+        }
+        if(message.isResume())
+        {
+            xfr.setResume(Boolean.TRUE);
+        }
+        if(message.getDeliveryState() != null)
+        {
+            xfr.setState(message.getDeliveryState());
+        }
+
+        xfr.setDeliveryTag(message.getDeliveryTag());
+        //xfr.setSettled(_windowSize ==0);
+        if(txn != null)
+        {
+            xfr.setSettled(false);
+            TransactionalState deliveryState = new TransactionalState();
+            deliveryState.setTxnId(txn.getTxnId());
+            xfr.setState(deliveryState);
+        }
+        else
+        {
+            xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
+        }
+        final Object lock = _endpoint.getLock();
+        synchronized(lock)
+        {
+            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+            {
+                try
+                {
+                    lock.wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+            if(_endpoint.isDetached())
+            {
+                throw new LinkDetachedException(_error);
+            }
+            if(action != null)
+            {
+                _outcomeActions.put(message.getDeliveryTag(), action);
+            }
+            _endpoint.transfer(xfr);
+            //TODO - rationalise sending of flows
+            // _endpoint.sendFlow();
+        }
+
+        if(_windowSize != 0)
+        {
+            synchronized(lock)
+            {
+
+
+                while(_endpoint.getUnsettledCount() >= _windowSize)
+                {
+                    try
+                    {
+                        lock.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+            }
+
+        }
+
+
+    }
+
+    public void close() throws SenderClosingException
+    {
+
+        if(_windowSize != 0)
+        {
+            synchronized(_endpoint.getLock())
+            {
+
+
+                while(_endpoint.getUnsettledCount() > 0)
+                {
+                    try
+                    {
+                        _endpoint.getLock().wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+            }
+
+        }
+        _session.removeSender(this);
+        _endpoint.setSource(null);
+        _endpoint.detach();
+        _closed = true;
+
+        synchronized(_endpoint.getLock())
+        {
+            while(!_endpoint.isDetached())
+            {
+                try
+                {
+                    _endpoint.getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new SenderClosingException(e);
+                }
+            }
+        }
+    }
+
+    public boolean isClosed()
+    {
+        return _closed;
+    }
+
+    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    {
+        if(state instanceof Outcome)
+        {
+            OutcomeAction action;
+            if((action = _outcomeActions.remove(deliveryTag)) != null)
+            {
+
+                final Outcome outcome = (Outcome) state;
+                action.onOutcome(deliveryTag, (outcome == null && settled) ? _defaultOutcome : outcome);
+            }
+            if(!Boolean.TRUE.equals(settled))
+            {
+                _endpoint.updateDisposition(deliveryTag, state, true);
+            }
+        }
+        else if(state instanceof TransactionalState)
+        {
+            OutcomeAction action;
+            if((action = _outcomeActions.remove(deliveryTag)) != null)
+            {
+                final Outcome outcome = ((TransactionalState) state).getOutcome();
+                action.onOutcome(deliveryTag, outcome == null ? _defaultOutcome : outcome);
+            }
+
+        }
+    }
+
+    public SendingLinkEndpoint getEndpoint()
+    {
+        return _endpoint;
+    }
+
+    public Map<Binary, DeliveryState> getRemoteUnsettled()
+    {
+        return _endpoint.getInitialUnsettledMap();
+    }
+
+    public Session getSession()
+    {
+        return _session;
+    }
+
+
+    private void remoteError()
+    {
+        if(_remoteErrorTask != null)
+        {
+            _remoteErrorTask.run();
+        }
+    }
+
+
+    public void setRemoteErrorListener(Runnable listener)
+    {
+        _remoteErrorTask = listener;
+    }
+
+    public Error getError()
+    {
+        return _error;
+    }
+
+    public class SenderCreationException extends Exception
+    {
+        public SenderCreationException(Throwable e)
+        {
+            super(e);
+        }
+
+        public SenderCreationException(String e)
+        {
+            super(e);
+
+        }
+    }
+
+    public class SenderClosingException extends Exception
+    {
+        public SenderClosingException(Throwable e)
+        {
+            super(e);
+        }
+    }
+
+    public static interface OutcomeAction
+    {
+        public void onOutcome(Binary deliveryTag, Outcome outcome);
+    }
+}

Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Oct 21 22:04:51 2013
@@ -1,384 +1,402 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionState;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class Session
-{
-    private SessionEndpoint _endpoint;
-    private List<Receiver> _receivers = new ArrayList<Receiver>();
-    private List<Sender> _senders = new ArrayList<Sender>();
-    private SectionEncoder _sectionEncoder;
-    private SectionDecoder _sectionDecoder;
-    private TransactionController _sessionLocalTC;
-    private Connection _connection;
-
-    public Session(final Connection connection, String name)
-    {
-        _connection = connection;
-        _endpoint = connection.getEndpoint().createSession(name);
-        _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
-        _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
-    }
-
-
-    public synchronized Sender createSender(final String targetName)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, false);
-    }
-
-    public synchronized Sender createSender(final String targetName, boolean synchronous)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-
-        final String sourceName = UUID.randomUUID().toString();
-        return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
-
-    }
-
-    public synchronized Sender createSender(final String targetName, int window)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-         final String sourceName = UUID.randomUUID().toString();
-         return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
-
-    }
-
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-
-        return createSender(targetName, window, mode, null);
-    }
-
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, window, mode, linkName, null);
-    }
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, window, mode, linkName, false, unsettled);
-    }
-
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
-                               boolean isDurable, Map<Binary, Outcome> unsettled)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
-                          targetName, null, window, mode, isDurable, unsettled);
-
-    }
-
-
-    public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
-    }
-
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode);
-    }
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName);
-    }
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName, isDurable);
-    }
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
-                                   Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
-    }
-
-
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
-                                   boolean isDurable, Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-        return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
-    }
-
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
-    }
-
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, ackMode, null);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr,mode, ackMode, linkName, false);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
-    }
-
-    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
-                                            Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
-    }
-
-    public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
-                                            Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
-            throws ConnectionErrorException
-    {
-
-        final Target target = new Target();
-        final Source source = new Source();
-        source.setAddress(sourceAddr);
-        source.setDistributionMode(mode);
-        source.setFilter(filters);
-
-        if(linkName == null)
-        {
-            linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
-        }
-
-        final Receiver receiver =
-                new Receiver(this, linkName,
-                        target, source, ackMode, isDurable, unsettled);
-        _receivers.add(receiver);
-
-        return receiver;
-
-    }
-
-    public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, StdDistMode.COPY);
-    }
-
-    public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
-    {
-        return createReceiver(sourceAddr, StdDistMode.MOVE);
-    }
-
-    public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
-    {
-        Source source = new Source();
-        source.setDynamic(true);
-
-        final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
-                                               source, AcknowledgeMode.ALO);
-        _receivers.add(receiver);
-        return receiver;
-    }
-
-    public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        Target target = new Target();
-        target.setDynamic(true);
-
-        final Sender sender;
-        sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
-                                                   new Source(), 0, AcknowledgeMode.ALO);
-        _senders.add(sender);
-        return sender;
-    }
-
-
-
-    public SessionEndpoint getEndpoint()
-    {
-        return _endpoint;
-    }
-
-    public synchronized void close()
-    {
-        try
-        {
-            for(Sender sender : new ArrayList<Sender>(_senders))
-            {
-                sender.close();
-            }
-            for(Receiver receiver : new ArrayList<Receiver>(_receivers))
-            {
-                receiver.detach();
-            }
-            if(_sessionLocalTC != null)
-            {
-                _sessionLocalTC.close();
-            }
-            _endpoint.end();
-        }
-        catch (Sender.SenderClosingException e)
-        {
-// TODO
-            e.printStackTrace();
-        }
-
-        //TODO
-
-    }
-
-    void removeSender(Sender sender)
-    {
-        _senders.remove(sender);
-    }
-
-    void removeReceiver(Receiver receiver)
-    {
-        _receivers.remove(receiver);
-    }
-
-    public SectionEncoder getSectionEncoder()
-    {
-        return _sectionEncoder;
-    }
-
-    public SectionDecoder getSectionDecoder()
-    {
-        return _sectionDecoder;
-    }
-
-
-    public Transaction createSessionLocalTransaction()
-    {
-        TransactionController localController = getSessionLocalTransactionController();
-        return localController.beginTransaction();
-    }
-
-    private TransactionController getSessionLocalTransactionController()
-    {
-        if(_sessionLocalTC == null)
-        {
-            _sessionLocalTC = createSessionLocalTransactionController();
-        }
-        return _sessionLocalTC;
-    }
-
-    private TransactionController createSessionLocalTransactionController()
-    {
-        String name = "txnControllerLink";
-        SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
-                                                                                   TxnCapability.MULTI_TXNS_PER_SSN);
-        tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
-        tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
-        tcLinkEndpoint.attach();
-        return new TransactionController(this, tcLinkEndpoint);
-    }
-
-
-    public Message receive()
-    {
-        while(getEndpoint().getState() == SessionState.ACTIVE)
-        {
-            synchronized (getEndpoint().getLock())
-            {
-                try
-                {
-                    for(Receiver r : _receivers)
-                    {
-                        Message m = r.receive(false);
-                        if(m != null)
-                            return m;
-                    }
-                    wait();
-                }
-                catch (InterruptedException e)
-                {
-                }
-            }
-        }
-        return null;
-    }
-
-    public Connection getConnection()
-    {
-        return _connection;
-    }
-
-    public void awaitActive()
-    {
-        synchronized(getEndpoint().getLock())
-        {
-            while(!getEndpoint().isEnded() && !getEndpoint().isActive())
-            {
-                try
-                {
-                    getEndpoint().getLock().wait();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionState;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class Session
+{
+    private SessionEndpoint _endpoint;
+    private List<Receiver> _receivers = new ArrayList<Receiver>();
+    private List<Sender> _senders = new ArrayList<Sender>();
+    private SectionEncoder _sectionEncoder;
+    private SectionDecoder _sectionDecoder;
+    private TransactionController _sessionLocalTC;
+    private Connection _connection;
+
+    public Session(final Connection connection, String name) throws SessionCreationException
+    {
+        _connection = connection;
+        _endpoint = connection.getEndpoint().createSession(name);
+        if(_endpoint == null)
+        {
+            throw new SessionCreationException("Cannot create session as all channels are in use");
+        }
+        _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+        _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+    }
+
+
+    public synchronized Sender createSender(final String targetName)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+
+        final String sourceName = UUID.randomUUID().toString();
+        return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false);
+
+    }
+
+
+    public synchronized Sender createSender(final String targetName, final SourceConfigurator configurator)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+
+        final String sourceName = UUID.randomUUID().toString();
+        return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false)
+        {
+            @Override
+            protected void configureSource(final Source source)
+            {
+                configurator.configureSource(source);
+            }
+        };
+
+    }
+
+    public synchronized Sender createSender(final String targetName, int window)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+         final String sourceName = UUID.randomUUID().toString();
+         return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
+
+    }
+
+    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        return createSender(targetName, window, mode, linkName, false, null);
+    }
+
+    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
+                               boolean isDurable, Map<Binary, Outcome> unsettled)
+            throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
+                          targetName, null, window, mode, isDurable, unsettled);
+
+    }
+
+
+    public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
+    }
+
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode);
+    }
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName);
+    }
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName, isDurable);
+    }
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
+                                   Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
+    }
+
+
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
+                                   boolean isDurable, Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+        return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
+    }
+
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
+    }
+
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, ackMode, null);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr,mode, ackMode, linkName, false);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
+    }
+
+    private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+                                            Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
+    }
+
+    public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+                                            final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+                                            Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+            throws ConnectionErrorException
+    {
+
+        final Target target = new Target();
+        final Source source = new Source();
+        source.setAddress(sourceAddr);
+        source.setDistributionMode(mode);
+        source.setFilter(filters);
+
+        if(linkName == null)
+        {
+            linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
+        }
+
+        final Receiver receiver =
+                new Receiver(this, linkName,
+                        target, source, ackMode, isDurable, unsettled);
+        _receivers.add(receiver);
+
+        return receiver;
+
+    }
+
+    public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, StdDistMode.COPY);
+    }
+
+    public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
+    {
+        return createReceiver(sourceAddr, StdDistMode.MOVE);
+    }
+
+    public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
+    {
+        Source source = new Source();
+        source.setDynamic(true);
+
+        final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
+                                               source, AcknowledgeMode.ALO);
+        _receivers.add(receiver);
+        return receiver;
+    }
+
+    public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
+    {
+        Target target = new Target();
+        target.setDynamic(true);
+
+        final Sender sender;
+        sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
+                                                   new Source(), 0, AcknowledgeMode.ALO);
+        _senders.add(sender);
+        return sender;
+    }
+
+
+
+    public SessionEndpoint getEndpoint()
+    {
+        return _endpoint;
+    }
+
+    public synchronized void close()
+    {
+        try
+        {
+            for(Sender sender : new ArrayList<Sender>(_senders))
+            {
+                sender.close();
+            }
+            for(Receiver receiver : new ArrayList<Receiver>(_receivers))
+            {
+                receiver.detach();
+            }
+            if(_sessionLocalTC != null)
+            {
+                _sessionLocalTC.close();
+            }
+            _endpoint.end();
+        }
+        catch (Sender.SenderClosingException e)
+        {
+// TODO
+            e.printStackTrace();
+        }
+
+        //TODO
+
+    }
+
+    void removeSender(Sender sender)
+    {
+        _senders.remove(sender);
+    }
+
+    void removeReceiver(Receiver receiver)
+    {
+        _receivers.remove(receiver);
+    }
+
+    public SectionEncoder getSectionEncoder()
+    {
+        return _sectionEncoder;
+    }
+
+    public SectionDecoder getSectionDecoder()
+    {
+        return _sectionDecoder;
+    }
+
+
+    public Transaction createSessionLocalTransaction()
+    {
+        TransactionController localController = getSessionLocalTransactionController();
+        return localController.beginTransaction();
+    }
+
+    private TransactionController getSessionLocalTransactionController()
+    {
+        if(_sessionLocalTC == null)
+        {
+            _sessionLocalTC = createSessionLocalTransactionController();
+        }
+        return _sessionLocalTC;
+    }
+
+    private TransactionController createSessionLocalTransactionController()
+    {
+        String name = "txnControllerLink";
+        SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
+                                                                                   TxnCapability.MULTI_TXNS_PER_SSN);
+        tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+        tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+        tcLinkEndpoint.attach();
+        return new TransactionController(this, tcLinkEndpoint);
+    }
+
+
+    public Message receive()
+    {
+        while(getEndpoint().getState() == SessionState.ACTIVE)
+        {
+            synchronized (getEndpoint().getLock())
+            {
+                try
+                {
+                    for(Receiver r : _receivers)
+                    {
+                        Message m = r.receive(false);
+                        if(m != null)
+                            return m;
+                    }
+                    wait();
+                }
+                catch (InterruptedException e)
+                {
+                }
+            }
+        }
+        return null;
+    }
+
+    public Connection getConnection()
+    {
+        return _connection;
+    }
+
+    public void awaitActive()
+    {
+        synchronized(getEndpoint().getLock())
+        {
+            while(!getEndpoint().isEnded() && !getEndpoint().isActive())
+            {
+                try
+                {
+                    getEndpoint().getLock().wait();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+        }
+    }
+
+    public static interface SourceConfigurator
+    {
+        public void configureSource(final Source source);
+    }
+
+    private class SessionCreationException extends ConnectionException
+    {
+
+        private SessionCreationException(final String message)
+        {
+            super(message);
+        }
+
+    }
+}

Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Oct 21 22:04:51 2013
@@ -0,0 +1,3 @@
+*.iml
+target
+release

Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1501885-1534385

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java Mon Oct 21 22:04:51 2013
@@ -32,30 +32,6 @@ public class StringTypeConstructor exten
 {
     private Charset _charSet;
 
-    private BinaryString _defaultBinaryString = new BinaryString();
-    private ValueCache<BinaryString, String> _cachedValues = new ValueCache<BinaryString, String>(10);
-
-    private static final class ValueCache<K,V> extends LinkedHashMap<K,V>
-    {
-        private final int _cacheSize;
-
-        public ValueCache(int cacheSize)
-        {
-            _cacheSize = cacheSize;
-        }
-
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<K, V> eldest)
-        {
-            return size() > _cacheSize;
-        }
-
-        public boolean isFull()
-        {
-            return size() == _cacheSize;
-        }
-    }
-
 
     public static StringTypeConstructor getInstance(int i, Charset c)
     {
@@ -84,44 +60,19 @@ public class StringTypeConstructor exten
         }
 
         int origPosition = in.position();
-        _defaultBinaryString.setData(in.array(), in.arrayOffset()+ origPosition, size);
-
-        BinaryString binaryStr = _defaultBinaryString;
-
-        boolean isFull = _cachedValues.isFull();
 
-        String str = isFull ? _cachedValues.remove(binaryStr) : _cachedValues.get(binaryStr);
-
-        if(str == null)
+        ByteBuffer dup = in.duplicate();
+        try
         {
-
-            ByteBuffer dup = in.duplicate();
-            try
-            {
-                dup.limit(dup.position()+size);
-            }
-            catch(IllegalArgumentException e)
-            {
-                throw new IllegalArgumentException("position: " + dup.position() + "size: " + size + " capacity: " + dup.capacity());
-            }
-            CharBuffer charBuf = _charSet.decode(dup);
-
-            str = charBuf.toString();
-
-            byte[] data = new byte[size];
-            in.get(data);
-            binaryStr = new BinaryString(data, 0, size);
-
-            _cachedValues.put(binaryStr, str);
+            dup.limit(dup.position()+size);
         }
-        else if(isFull)
+        catch(IllegalArgumentException e)
         {
-            byte[] data = new byte[size];
-            in.get(data);
-            binaryStr = new BinaryString(data, 0, size);
-
-            _cachedValues.put(binaryStr, str);
+            throw new IllegalArgumentException("position: " + dup.position() + "size: " + size + " capacity: " + dup.capacity());
         }
+        CharBuffer charBuf = _charSet.decode(dup);
+
+        String str = charBuf.toString();
 
         in.position(origPosition+size);
 

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Mon Oct 21 22:04:51 2013
@@ -32,8 +32,6 @@ public class SymbolTypeConstructor exten
 {
     private static final Charset ASCII = Charset.forName("US-ASCII");
 
-    private BinaryString _defaultBinaryString = new BinaryString();
-
     private static final ConcurrentHashMap<BinaryString, Symbol> SYMBOL_MAP =
             new ConcurrentHashMap<BinaryString, Symbol>(2048);
 
@@ -62,9 +60,7 @@ public class SymbolTypeConstructor exten
             size = in.getInt();
         }
 
-        _defaultBinaryString.setData(in.array(), in.arrayOffset()+in.position(), size);
-
-        BinaryString binaryStr = _defaultBinaryString;
+        BinaryString binaryStr = new BinaryString(in.array(), in.arrayOffset()+in.position(), size);
 
         Symbol symbolVal = SYMBOL_MAP.get(binaryStr);
         if(symbolVal == null)

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Mon Oct 21 22:04:51 2013
@@ -386,12 +386,14 @@ public class ConnectionHandler
         private BytesSource _bytesSource;
         private boolean _closed;
         private ConnectionEndpoint _conn;
+        private SocketExceptionHandler _exceptionHandler;
 
-        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
+        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
             {
                 _outputStream = outputStream;
                 _bytesSource = source;
                 _conn = conn;
+                _exceptionHandler = exceptionHandler;
             }
 
             public void run()
@@ -421,7 +423,7 @@ public class ConnectionHandler
             catch (IOException e)
             {
                 _closed = true;
-                e.printStackTrace();  //TODO
+                _exceptionHandler.processSocketException(e);
             }
         }
     }

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Mon Oct 21 22:04:51 2013
@@ -184,7 +184,7 @@ public class FrameHandler implements Pro
                     // type
 
                     int type = in.get() & 0xFF;
-                    int channel = in.getShort() & 0xFF;
+                    int channel = in.getShort() & 0xFFFF;
 
                     if(type != 0 && type != 1)
                     {

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Oct 21 22:04:51 2013
@@ -68,22 +68,22 @@ public class ConnectionEndpoint implemen
     private final Container _container;
     private Principal _user;
 
-    private static final short DEFAULT_CHANNEL_MAX = 255;
+    private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
     private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
 
 
     private ConnectionState _state = ConnectionState.UNOPENED;
-    private short _channelMax;
+    private short _channelMax = DEFAULT_CHANNEL_MAX;
     private int _maxFrameSize = 4096;
     private String _remoteContainerId;
 
     private SocketAddress _remoteAddress;
 
     // positioned by the *outgoing* channel
-    private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+    private SessionEndpoint[] _sendingSessions;
 
     // positioned by the *incoming* channel
-    private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+    private SessionEndpoint[] _receivingSessions;
     private boolean _closedForInput;
     private boolean _closedForOutput;
 
@@ -165,7 +165,7 @@ public class ConnectionEndpoint implemen
         }
         if (_state == ConnectionState.UNOPENED)
         {
-            sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME);
+            sendOpen(_channelMax, DEFAULT_MAX_FRAME);
             _state = ConnectionState.AWAITING_OPEN;
         }
     }
@@ -183,10 +183,10 @@ public class ConnectionEndpoint implemen
     public synchronized SessionEndpoint createSession(String name)
     {
         // todo assert connection state
-        SessionEndpoint endpoint = new SessionEndpoint(this);
         short channel = getFirstFreeChannel();
         if (channel != -1)
         {
+            SessionEndpoint endpoint = new SessionEndpoint(this);
             _sendingSessions[channel] = endpoint;
             endpoint.setSendingChannel(channel);
             Begin begin = new Begin();
@@ -196,13 +196,14 @@ public class ConnectionEndpoint implemen
 
             begin.setHandleMax(_handleMax);
             send(channel, begin);
+            return endpoint;
 
         }
         else
         {
-            // todo error
+            // TODO - report error
+            return null;
         }
-        return endpoint;
     }
 
 
@@ -235,7 +236,16 @@ public class ConnectionEndpoint implemen
     {
         Open open = new Open();
 
-        open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX));
+        if(_receivingSessions == null)
+        {
+            _receivingSessions = new SessionEndpoint[channelMax+1];
+            _sendingSessions = new SessionEndpoint[channelMax+1];
+        }
+        if(channelMax < _channelMax)
+        {
+            _channelMax = channelMax;
+        }
+        open.setChannelMax(UnsignedShort.valueOf(channelMax));
         open.setContainerId(_container.getId());
         open.setMaxFrameSize(getDesiredMaxFrameSize());
         open.setHostname(getRemoteHostname());
@@ -268,7 +278,7 @@ public class ConnectionEndpoint implemen
 
     short getFirstFreeChannel()
     {
-        for (int i = 0; i < _sendingSessions.length; i++)
+        for (int i = 0; i <= _channelMax; i++)
         {
             if (_sendingSessions[i] == null)
             {
@@ -288,10 +298,16 @@ public class ConnectionEndpoint implemen
     public synchronized void receiveOpen(short channel, Open open)
     {
 
-        _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX
-                : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX
-                        ? DEFAULT_CHANNEL_MAX
-                        : open.getChannelMax().shortValue();
+        _channelMax = open.getChannelMax() == null ? _channelMax
+                : open.getChannelMax().shortValue() < _channelMax
+                        ? open.getChannelMax().shortValue()
+                        : _channelMax;
+
+        if(_receivingSessions == null)
+        {
+            _receivingSessions = new SessionEndpoint[_channelMax+1];
+            _sendingSessions = new SessionEndpoint[_channelMax+1];
+        }
 
         UnsignedInteger remoteDesiredMaxFrameSize =
                 open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
@@ -380,13 +396,30 @@ public class ConnectionEndpoint implemen
         if (!_closedForInput)
         {
             _closedForInput = true;
-            for (int i = 0; i < _receivingSessions.length; i++)
+            switch(_state)
+            {
+                case UNOPENED:
+                case AWAITING_OPEN:
+                case CLOSE_SENT:
+                    _state = ConnectionState.CLOSED;
+                case OPEN:
+                    _state = ConnectionState.CLOSE_RECEIVED;
+                case CLOSED:
+                    // already sent our close - too late to do anything more
+                    break;
+                default:
+            }
+
+            if(_receivingSessions != null)
             {
-                if (_receivingSessions[i] != null)
+                for (int i = 0; i < _receivingSessions.length; i++)
                 {
-                    _receivingSessions[i].end();
-                    _receivingSessions[i] = null;
+                    if (_receivingSessions[i] != null)
+                    {
+                        _receivingSessions[i].end();
+                        _receivingSessions[i] = null;
 
+                    }
                 }
             }
         }
@@ -604,7 +637,6 @@ public class ConnectionEndpoint implemen
         }
     }
 
-
     public void invalidHeaderReceived()
     {
         // TODO
@@ -984,4 +1016,9 @@ public class ConnectionEndpoint implemen
     {
         return _remoteError;
     }
+
+    public void setChannelMax(final short channelMax)
+    {
+        _channelMax = channelMax;
+    }
 }

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Mon Oct 21 22:04:51 2013
@@ -444,13 +444,25 @@ public abstract class LinkEndpoint<T ext
         sendFlow(_flowTransactionId != null);
     }
 
+    public void sendFlowWithEcho()
+    {
+        sendFlow(_flowTransactionId != null, true);
+    }
+
+
     public void sendFlow(boolean setTransactionId)
     {
+        sendFlow(setTransactionId, false);
+    }
+
+    public void sendFlow(boolean setTransactionId, boolean echo)
+    {
         if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
         {
             Flow flow = new Flow();
             flow.setLinkCredit(_linkCredit);
             flow.setDeliveryCount(_deliveryCount);
+            flow.setEcho(echo);
             _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
             flow.setAvailable(_available);
             flow.setDrain(_drain);

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java Mon Oct 21 22:04:51 2013
@@ -288,7 +288,7 @@ public class ReceivingLinkEndpoint exten
             setDrain(true);
             _creditWindow = false;
             _drainLimit = getDeliveryCount().add(getLinkCredit());
-            sendFlow();
+            sendFlowWithEcho();
             getLock().notifyAll();
         }
     }

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java Mon Oct 21 22:04:51 2013
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.codec.Un
 import org.apache.qpid.amqp_1_0.codec.ValueWriter;
 
 
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.RestrictedType;
 import org.apache.qpid.amqp_1_0.type.transport.*;
 import org.apache.qpid.amqp_1_0.type.transport.codec.*;

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java Mon Oct 21 22:04:51 2013
@@ -152,7 +152,7 @@ public class Source
         return _outcomes;
     }
 
-    public void setOutcomes(Symbol[] outcomes)
+    public void setOutcomes(Symbol... outcomes)
     {
         _outcomes = outcomes;
     }

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java Mon Oct 21 22:04:51 2013
@@ -34,9 +34,10 @@ import java.util.List;
 
 public class AcceptedConstructor extends DescribedTypeConstructor<Accepted>
 {
+    public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:accepted:list");
     private static final Object[] DESCRIPTORS =
     {
-            Symbol.valueOf("amqp:accepted:list"),UnsignedLong.valueOf(0x0000000000000024L),
+            SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000024L),
     };
 
     private static final AcceptedConstructor INSTANCE = new AcceptedConstructor();

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java Mon Oct 21 22:04:51 2013
@@ -34,9 +34,10 @@ import java.util.List;
 
 public class RejectedConstructor extends DescribedTypeConstructor<Rejected>
 {
+    public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:rejected:list");
     private static final Object[] DESCRIPTORS =
     {
-            Symbol.valueOf("amqp:rejected:list"),UnsignedLong.valueOf(0x0000000000000025L),
+            SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000025L),
     };
 
     private static final RejectedConstructor INSTANCE = new RejectedConstructor();

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java Mon Oct 21 22:04:51 2013
@@ -231,7 +231,7 @@ public class SourceConstructor extends D
 
                     try
                     {
-                        obj.setDistributionMode( (DistributionMode) val );
+                        obj.setDistributionMode( StdDistMode.valueOf(val) );
                     }
                     catch(ClassCastException e)
                     {
@@ -326,7 +326,7 @@ public class SourceConstructor extends D
                             // TODO Error
                         }
                     }
-                    
+
                 }
 
 
@@ -360,7 +360,7 @@ public class SourceConstructor extends D
                             // TODO Error
                         }
                     }
-                    
+
                 }
 
 

Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java Mon Oct 21 22:04:51 2013
@@ -43,6 +43,8 @@ public class ConnectionError
     
     public static final ConnectionError REDIRECT = new ConnectionError(Symbol.valueOf("amqp:connection:redirect"));
     
+    public static final ConnectionError SOCKET_ERROR = new ConnectionError(Symbol.valueOf("amqp:connection:socket-error"));
+    
 
 
     private ConnectionError(Symbol val)
@@ -73,6 +75,11 @@ public class ConnectionError
             return "redirect";
         }
         
+        if(this == SOCKET_ERROR)
+        {
+            return "socket-error";
+        }
+        
         else
         {
             return String.valueOf(_val);
@@ -97,6 +104,11 @@ public class ConnectionError
         {
             return REDIRECT;
         }
+        
+        if(SOCKET_ERROR._val.equals(val))
+        {
+            return SOCKET_ERROR;
+        }
     
         // TODO ERROR
         return null;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org