You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 00:05:07 UTC
svn commit: r1534394 [17/22] - in /qpid/branches/linearstore/qpid: ./ cpp/
cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/
cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/
cpp/examples/qmf-agent/ cpp/include/qpid/ c...
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java Mon Oct 21 22:04:51 2013
@@ -1,452 +1,497 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Sender implements DeliveryStateHandler
-{
- private SendingLinkEndpoint _endpoint;
- private int _id;
- private Session _session;
- private int _windowSize;
- private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
- private boolean _closed;
- private Error _error;
- private Runnable _remoteErrorTask;
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, false);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- boolean synchronous)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
- }
-
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, AcknowledgeMode.ALO);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
- source.setAddress(sourceAddr);
- return source;
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
- target.setAddress(targetAddr);
- if(isDurable)
- {
- target.setDurable(TerminusDurability.UNSETTLED_STATE);
- target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- return target;
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
-
- _session = session;
- session.getConnection().checkNotClosed();
- _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
- source, target, unsettled);
-
-
- switch(mode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
- _endpoint.setDeliveryStateHandler(this);
- _endpoint.attach();
- _windowSize = window;
-
- synchronized(_endpoint.getLock())
- {
- while(!(_endpoint.isAttached() || _endpoint.isDetached()))
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
- }
- if(_endpoint.getTarget()== null)
- {
- throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
- };
- }
-
- _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
- {
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(_error != null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
- }
-
- public Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public Target getTarget()
- {
- return _endpoint.getTarget();
- }
-
- public void send(Message message) throws LinkDetachedException
- {
- send(message, null, null);
- }
-
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException
- {
- send(message, null, action);
- }
-
- public void send(Message message, final Transaction txn) throws LinkDetachedException
- {
- send(message, txn, null);
- }
-
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
- {
-
- List<Section> sections = message.getPayload();
-
- Transfer xfr = new Transfer();
-
- if(sections != null && !sections.isEmpty())
- {
- SectionEncoder encoder = _session.getSectionEncoder();
- encoder.reset();
-
- int sectionNumber = 0;
- for(Section section : sections)
- {
- encoder.encodeObject(section);
- }
-
-
- Binary encoding = encoder.getEncoding();
- ByteBuffer payload = encoding.asByteBuffer();
- xfr.setPayload(payload);
- }
- if(message.getDeliveryTag() == null)
- {
- message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
- }
- if(message.isResume())
- {
- xfr.setResume(Boolean.TRUE);
- }
- if(message.getDeliveryState() != null)
- {
- xfr.setState(message.getDeliveryState());
- }
-
- xfr.setDeliveryTag(message.getDeliveryTag());
- //xfr.setSettled(_windowSize ==0);
- if(txn != null)
- {
- xfr.setSettled(false);
- TransactionalState deliveryState = new TransactionalState();
- deliveryState.setTxnId(txn.getTxnId());
- xfr.setState(deliveryState);
- }
- else
- {
- xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
- }
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- if(_endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- if(action != null)
- {
- _outcomeActions.put(message.getDeliveryTag(), action);
- }
- _endpoint.transfer(xfr);
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
- }
-
- if(_windowSize != 0)
- {
- synchronized(lock)
- {
-
-
- while(_endpoint.getUnsettledCount() >= _windowSize)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
-
- }
-
- public void close() throws SenderClosingException
- {
-
- if(_windowSize != 0)
- {
- synchronized(_endpoint.getLock())
- {
-
-
- while(_endpoint.getUnsettledCount() > 0)
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
- _session.removeSender(this);
- _endpoint.setSource(null);
- _endpoint.detach();
- _closed = true;
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderClosingException(e);
- }
- }
- }
- }
-
- public boolean isClosed()
- {
- return _closed;
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- if(state instanceof Outcome)
- {
- OutcomeAction action;
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, (Outcome) state);
- }
- if(!Boolean.TRUE.equals(settled))
- {
- _endpoint.updateDisposition(deliveryTag, state, true);
- }
- }
- else if(state instanceof TransactionalState)
- {
- OutcomeAction action;
-
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
- }
-
- }
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Map<Binary, DeliveryState> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
- public Session getSession()
- {
- return _session;
- }
-
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public class SenderCreationException extends Exception
- {
- public SenderCreationException(Throwable e)
- {
- super(e);
- }
-
- public SenderCreationException(String e)
- {
- super(e);
-
- }
- }
-
- public class SenderClosingException extends Exception
- {
- public SenderClosingException(Throwable e)
- {
- super(e);
- }
- }
-
- public static interface OutcomeAction
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class Sender implements DeliveryStateHandler
+{
+ private SendingLinkEndpoint _endpoint;
+ private int _id;
+ private Session _session;
+ private int _windowSize;
+ private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
+ private boolean _closed;
+ private Error _error;
+ private Runnable _remoteErrorTask;
+ private Outcome _defaultOutcome;
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, false);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ boolean synchronous)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window) throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
+ }
+
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window) throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, target, source, window, AcknowledgeMode.ALO);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, mode, null);
+ }
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window, AcknowledgeMode mode)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, target, source, window, mode, null);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
+ }
+
+ protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source)
+ {
+
+ }
+
+ protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target)
+ {
+
+ }
+
+ private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
+ {
+ org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
+ source.setAddress(sourceAddr);
+ return source;
+ }
+
+ private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
+ {
+ org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
+ target.setAddress(targetAddr);
+ if(isDurable)
+ {
+ target.setDurable(TerminusDurability.UNSETTLED_STATE);
+ target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ return target;
+ }
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException, ConnectionClosedException
+ {
+
+ _session = session;
+ session.getConnection().checkNotClosed();
+ configureSource(source);
+ configureTarget(target);
+ _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
+ source, target, unsettled);
+
+
+ switch(mode)
+ {
+ case ALO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ break;
+ case EO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+ _endpoint.setDeliveryStateHandler(this);
+ _endpoint.attach();
+ _windowSize = window;
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new SenderCreationException(e);
+ }
+ }
+ if(_endpoint.getTarget()== null)
+ {
+ throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
+ };
+ }
+
+ _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+ {
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ if(_error != null)
+ {
+ remoteError();
+ }
+ super.remoteDetached(endpoint, detach);
+ }
+ });
+
+ _defaultOutcome = source.getDefaultOutcome();
+ if(_defaultOutcome == null)
+ {
+ if(source.getOutcomes() == null || source.getOutcomes().length == 0)
+ {
+ _defaultOutcome = new Accepted();
+ }
+ else if(source.getOutcomes().length == 1)
+ {
+
+ final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession()
+ .getConnection()
+ .getDescribedTypeRegistry();
+
+ DescribedTypeConstructor constructor = describedTypeRegistry
+ .getConstructor(source.getOutcomes()[0]);
+ if(constructor != null)
+ {
+ Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST);
+ if(impliedOutcome instanceof Outcome)
+ {
+ _defaultOutcome = (Outcome) impliedOutcome;
+ }
+ }
+
+ }
+ }
+ }
+
+ public Source getSource()
+ {
+ return _endpoint.getSource();
+ }
+
+ public Target getTarget()
+ {
+ return _endpoint.getTarget();
+ }
+
+ public void send(Message message) throws LinkDetachedException
+ {
+ send(message, null, null);
+ }
+
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+ {
+ send(message, null, action);
+ }
+
+ public void send(Message message, final Transaction txn) throws LinkDetachedException
+ {
+ send(message, txn, null);
+ }
+
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+ {
+
+ List<Section> sections = message.getPayload();
+
+ Transfer xfr = new Transfer();
+
+ if(sections != null && !sections.isEmpty())
+ {
+ SectionEncoder encoder = _session.getSectionEncoder();
+ encoder.reset();
+
+ int sectionNumber = 0;
+ for(Section section : sections)
+ {
+ encoder.encodeObject(section);
+ }
+
+
+ Binary encoding = encoder.getEncoding();
+ ByteBuffer payload = encoding.asByteBuffer();
+ xfr.setPayload(payload);
+ }
+ if(message.getDeliveryTag() == null)
+ {
+ message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
+ }
+ if(message.isResume())
+ {
+ xfr.setResume(Boolean.TRUE);
+ }
+ if(message.getDeliveryState() != null)
+ {
+ xfr.setState(message.getDeliveryState());
+ }
+
+ xfr.setDeliveryTag(message.getDeliveryTag());
+ //xfr.setSettled(_windowSize ==0);
+ if(txn != null)
+ {
+ xfr.setSettled(false);
+ TransactionalState deliveryState = new TransactionalState();
+ deliveryState.setTxnId(txn.getTxnId());
+ xfr.setState(deliveryState);
+ }
+ else
+ {
+ xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
+ }
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
+ if(action != null)
+ {
+ _outcomeActions.put(message.getDeliveryTag(), action);
+ }
+ _endpoint.transfer(xfr);
+ //TODO - rationalise sending of flows
+ // _endpoint.sendFlow();
+ }
+
+ if(_windowSize != 0)
+ {
+ synchronized(lock)
+ {
+
+
+ while(_endpoint.getUnsettledCount() >= _windowSize)
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+
+
+ }
+
+ public void close() throws SenderClosingException
+ {
+
+ if(_windowSize != 0)
+ {
+ synchronized(_endpoint.getLock())
+ {
+
+
+ while(_endpoint.getUnsettledCount() > 0)
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+ _session.removeSender(this);
+ _endpoint.setSource(null);
+ _endpoint.detach();
+ _closed = true;
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new SenderClosingException(e);
+ }
+ }
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ if(state instanceof Outcome)
+ {
+ OutcomeAction action;
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+
+ final Outcome outcome = (Outcome) state;
+ action.onOutcome(deliveryTag, (outcome == null && settled) ? _defaultOutcome : outcome);
+ }
+ if(!Boolean.TRUE.equals(settled))
+ {
+ _endpoint.updateDisposition(deliveryTag, state, true);
+ }
+ }
+ else if(state instanceof TransactionalState)
+ {
+ OutcomeAction action;
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ final Outcome outcome = ((TransactionalState) state).getOutcome();
+ action.onOutcome(deliveryTag, outcome == null ? _defaultOutcome : outcome);
+ }
+
+ }
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public Map<Binary, DeliveryState> getRemoteUnsettled()
+ {
+ return _endpoint.getInitialUnsettledMap();
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
+ public class SenderCreationException extends Exception
+ {
+ public SenderCreationException(Throwable e)
+ {
+ super(e);
+ }
+
+ public SenderCreationException(String e)
+ {
+ super(e);
+
+ }
+ }
+
+ public class SenderClosingException extends Exception
+ {
+ public SenderClosingException(Throwable e)
+ {
+ super(e);
+ }
+ }
+
+ public static interface OutcomeAction
+ {
+ public void onOutcome(Binary deliveryTag, Outcome outcome);
+ }
+}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Oct 21 22:04:51 2013
@@ -1,384 +1,402 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionState;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class Session
-{
- private SessionEndpoint _endpoint;
- private List<Receiver> _receivers = new ArrayList<Receiver>();
- private List<Sender> _senders = new ArrayList<Sender>();
- private SectionEncoder _sectionEncoder;
- private SectionDecoder _sectionDecoder;
- private TransactionController _sessionLocalTC;
- private Connection _connection;
-
- public Session(final Connection connection, String name)
- {
- _connection = connection;
- _endpoint = connection.getEndpoint().createSession(name);
- _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- }
-
-
- public synchronized Sender createSender(final String targetName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, false);
- }
-
- public synchronized Sender createSender(final String targetName, boolean synchronous)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
-
- }
-
- public synchronized Sender createSender(final String targetName, int window)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
-
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- return createSender(targetName, window, mode, null);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, null);
- }
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, false, unsettled);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
- targetName, null, window, mode, isDurable, unsettled);
-
- }
-
-
- public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, ackMode, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode, ackMode, linkName, false);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
- }
-
- public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
-
- final Target target = new Target();
- final Source source = new Source();
- source.setAddress(sourceAddr);
- source.setDistributionMode(mode);
- source.setFilter(filters);
-
- if(linkName == null)
- {
- linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
- }
-
- final Receiver receiver =
- new Receiver(this, linkName,
- target, source, ackMode, isDurable, unsettled);
- _receivers.add(receiver);
-
- return receiver;
-
- }
-
- public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, StdDistMode.COPY);
- }
-
- public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, StdDistMode.MOVE);
- }
-
- public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
- {
- Source source = new Source();
- source.setDynamic(true);
-
- final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
- source, AcknowledgeMode.ALO);
- _receivers.add(receiver);
- return receiver;
- }
-
- public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
- {
- Target target = new Target();
- target.setDynamic(true);
-
- final Sender sender;
- sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
- new Source(), 0, AcknowledgeMode.ALO);
- _senders.add(sender);
- return sender;
- }
-
-
-
- public SessionEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public synchronized void close()
- {
- try
- {
- for(Sender sender : new ArrayList<Sender>(_senders))
- {
- sender.close();
- }
- for(Receiver receiver : new ArrayList<Receiver>(_receivers))
- {
- receiver.detach();
- }
- if(_sessionLocalTC != null)
- {
- _sessionLocalTC.close();
- }
- _endpoint.end();
- }
- catch (Sender.SenderClosingException e)
- {
-// TODO
- e.printStackTrace();
- }
-
- //TODO
-
- }
-
- void removeSender(Sender sender)
- {
- _senders.remove(sender);
- }
-
- void removeReceiver(Receiver receiver)
- {
- _receivers.remove(receiver);
- }
-
- public SectionEncoder getSectionEncoder()
- {
- return _sectionEncoder;
- }
-
- public SectionDecoder getSectionDecoder()
- {
- return _sectionDecoder;
- }
-
-
- public Transaction createSessionLocalTransaction()
- {
- TransactionController localController = getSessionLocalTransactionController();
- return localController.beginTransaction();
- }
-
- private TransactionController getSessionLocalTransactionController()
- {
- if(_sessionLocalTC == null)
- {
- _sessionLocalTC = createSessionLocalTransactionController();
- }
- return _sessionLocalTC;
- }
-
- private TransactionController createSessionLocalTransactionController()
- {
- String name = "txnControllerLink";
- SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
- TxnCapability.MULTI_TXNS_PER_SSN);
- tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- tcLinkEndpoint.attach();
- return new TransactionController(this, tcLinkEndpoint);
- }
-
-
- public Message receive()
- {
- while(getEndpoint().getState() == SessionState.ACTIVE)
- {
- synchronized (getEndpoint().getLock())
- {
- try
- {
- for(Receiver r : _receivers)
- {
- Message m = r.receive(false);
- if(m != null)
- return m;
- }
- wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- return null;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
-
- public void awaitActive()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isEnded() && !getEndpoint().isActive())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionState;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class Session
+{
+ private SessionEndpoint _endpoint;
+ private List<Receiver> _receivers = new ArrayList<Receiver>();
+ private List<Sender> _senders = new ArrayList<Sender>();
+ private SectionEncoder _sectionEncoder;
+ private SectionDecoder _sectionDecoder;
+ private TransactionController _sessionLocalTC;
+ private Connection _connection;
+
+ public Session(final Connection connection, String name) throws SessionCreationException
+ {
+ _connection = connection;
+ _endpoint = connection.getEndpoint().createSession(name);
+ if(_endpoint == null)
+ {
+ throw new SessionCreationException("Cannot create session as all channels are in use");
+ }
+ _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+ _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+ }
+
+
+ public synchronized Sender createSender(final String targetName)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false);
+
+ }
+
+
+ public synchronized Sender createSender(final String targetName, final SourceConfigurator configurator)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false)
+ {
+ @Override
+ protected void configureSource(final Source source)
+ {
+ configurator.configureSource(source);
+ }
+ };
+
+ }
+
+ public synchronized Sender createSender(final String targetName, int window)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
+
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ return createSender(targetName, window, mode, linkName, false, null);
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
+ targetName, null, window, mode, isDurable, unsettled);
+
+ }
+
+
+ public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
+ }
+
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
+ }
+
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
+ }
+
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
+ }
+
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, ackMode, null);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr,mode, ackMode, linkName, false);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
+ }
+
+ public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+
+ final Target target = new Target();
+ final Source source = new Source();
+ source.setAddress(sourceAddr);
+ source.setDistributionMode(mode);
+ source.setFilter(filters);
+
+ if(linkName == null)
+ {
+ linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
+ }
+
+ final Receiver receiver =
+ new Receiver(this, linkName,
+ target, source, ackMode, isDurable, unsettled);
+ _receivers.add(receiver);
+
+ return receiver;
+
+ }
+
+ public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, StdDistMode.COPY);
+ }
+
+ public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, StdDistMode.MOVE);
+ }
+
+ public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
+ {
+ Source source = new Source();
+ source.setDynamic(true);
+
+ final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
+ source, AcknowledgeMode.ALO);
+ _receivers.add(receiver);
+ return receiver;
+ }
+
+ public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ Target target = new Target();
+ target.setDynamic(true);
+
+ final Sender sender;
+ sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
+ new Source(), 0, AcknowledgeMode.ALO);
+ _senders.add(sender);
+ return sender;
+ }
+
+
+
+ public SessionEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public synchronized void close()
+ {
+ try
+ {
+ for(Sender sender : new ArrayList<Sender>(_senders))
+ {
+ sender.close();
+ }
+ for(Receiver receiver : new ArrayList<Receiver>(_receivers))
+ {
+ receiver.detach();
+ }
+ if(_sessionLocalTC != null)
+ {
+ _sessionLocalTC.close();
+ }
+ _endpoint.end();
+ }
+ catch (Sender.SenderClosingException e)
+ {
+// TODO
+ e.printStackTrace();
+ }
+
+ //TODO
+
+ }
+
+ void removeSender(Sender sender)
+ {
+ _senders.remove(sender);
+ }
+
+ void removeReceiver(Receiver receiver)
+ {
+ _receivers.remove(receiver);
+ }
+
+ public SectionEncoder getSectionEncoder()
+ {
+ return _sectionEncoder;
+ }
+
+ public SectionDecoder getSectionDecoder()
+ {
+ return _sectionDecoder;
+ }
+
+
+ public Transaction createSessionLocalTransaction()
+ {
+ TransactionController localController = getSessionLocalTransactionController();
+ return localController.beginTransaction();
+ }
+
+ private TransactionController getSessionLocalTransactionController()
+ {
+ if(_sessionLocalTC == null)
+ {
+ _sessionLocalTC = createSessionLocalTransactionController();
+ }
+ return _sessionLocalTC;
+ }
+
+ private TransactionController createSessionLocalTransactionController()
+ {
+ String name = "txnControllerLink";
+ SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
+ TxnCapability.MULTI_TXNS_PER_SSN);
+ tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ tcLinkEndpoint.attach();
+ return new TransactionController(this, tcLinkEndpoint);
+ }
+
+
+ public Message receive()
+ {
+ while(getEndpoint().getState() == SessionState.ACTIVE)
+ {
+ synchronized (getEndpoint().getLock())
+ {
+ try
+ {
+ for(Receiver r : _receivers)
+ {
+ Message m = r.receive(false);
+ if(m != null)
+ return m;
+ }
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ return null;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public void awaitActive()
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ while(!getEndpoint().isEnded() && !getEndpoint().isActive())
+ {
+ try
+ {
+ getEndpoint().getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+
+ public static interface SourceConfigurator
+ {
+ public void configureSource(final Source source);
+ }
+
+ private class SessionCreationException extends ConnectionException
+ {
+
+ private SessionCreationException(final String message)
+ {
+ super(message);
+ }
+
+ }
+}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Oct 21 22:04:51 2013
@@ -0,0 +1,3 @@
+*.iml
+target
+release
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1501885-1534385
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java Mon Oct 21 22:04:51 2013
@@ -32,30 +32,6 @@ public class StringTypeConstructor exten
{
private Charset _charSet;
- private BinaryString _defaultBinaryString = new BinaryString();
- private ValueCache<BinaryString, String> _cachedValues = new ValueCache<BinaryString, String>(10);
-
- private static final class ValueCache<K,V> extends LinkedHashMap<K,V>
- {
- private final int _cacheSize;
-
- public ValueCache(int cacheSize)
- {
- _cacheSize = cacheSize;
- }
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest)
- {
- return size() > _cacheSize;
- }
-
- public boolean isFull()
- {
- return size() == _cacheSize;
- }
- }
-
public static StringTypeConstructor getInstance(int i, Charset c)
{
@@ -84,44 +60,19 @@ public class StringTypeConstructor exten
}
int origPosition = in.position();
- _defaultBinaryString.setData(in.array(), in.arrayOffset()+ origPosition, size);
-
- BinaryString binaryStr = _defaultBinaryString;
-
- boolean isFull = _cachedValues.isFull();
- String str = isFull ? _cachedValues.remove(binaryStr) : _cachedValues.get(binaryStr);
-
- if(str == null)
+ ByteBuffer dup = in.duplicate();
+ try
{
-
- ByteBuffer dup = in.duplicate();
- try
- {
- dup.limit(dup.position()+size);
- }
- catch(IllegalArgumentException e)
- {
- throw new IllegalArgumentException("position: " + dup.position() + "size: " + size + " capacity: " + dup.capacity());
- }
- CharBuffer charBuf = _charSet.decode(dup);
-
- str = charBuf.toString();
-
- byte[] data = new byte[size];
- in.get(data);
- binaryStr = new BinaryString(data, 0, size);
-
- _cachedValues.put(binaryStr, str);
+ dup.limit(dup.position()+size);
}
- else if(isFull)
+ catch(IllegalArgumentException e)
{
- byte[] data = new byte[size];
- in.get(data);
- binaryStr = new BinaryString(data, 0, size);
-
- _cachedValues.put(binaryStr, str);
+ throw new IllegalArgumentException("position: " + dup.position() + "size: " + size + " capacity: " + dup.capacity());
}
+ CharBuffer charBuf = _charSet.decode(dup);
+
+ String str = charBuf.toString();
in.position(origPosition+size);
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Mon Oct 21 22:04:51 2013
@@ -32,8 +32,6 @@ public class SymbolTypeConstructor exten
{
private static final Charset ASCII = Charset.forName("US-ASCII");
- private BinaryString _defaultBinaryString = new BinaryString();
-
private static final ConcurrentHashMap<BinaryString, Symbol> SYMBOL_MAP =
new ConcurrentHashMap<BinaryString, Symbol>(2048);
@@ -62,9 +60,7 @@ public class SymbolTypeConstructor exten
size = in.getInt();
}
- _defaultBinaryString.setData(in.array(), in.arrayOffset()+in.position(), size);
-
- BinaryString binaryStr = _defaultBinaryString;
+ BinaryString binaryStr = new BinaryString(in.array(), in.arrayOffset()+in.position(), size);
Symbol symbolVal = SYMBOL_MAP.get(binaryStr);
if(symbolVal == null)
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Mon Oct 21 22:04:51 2013
@@ -386,12 +386,14 @@ public class ConnectionHandler
private BytesSource _bytesSource;
private boolean _closed;
private ConnectionEndpoint _conn;
+ private SocketExceptionHandler _exceptionHandler;
- public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
+ public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
{
_outputStream = outputStream;
_bytesSource = source;
_conn = conn;
+ _exceptionHandler = exceptionHandler;
}
public void run()
@@ -421,7 +423,7 @@ public class ConnectionHandler
catch (IOException e)
{
_closed = true;
- e.printStackTrace(); //TODO
+ _exceptionHandler.processSocketException(e);
}
}
}
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Mon Oct 21 22:04:51 2013
@@ -184,7 +184,7 @@ public class FrameHandler implements Pro
// type
int type = in.get() & 0xFF;
- int channel = in.getShort() & 0xFF;
+ int channel = in.getShort() & 0xFFFF;
if(type != 0 && type != 1)
{
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Oct 21 22:04:51 2013
@@ -68,22 +68,22 @@ public class ConnectionEndpoint implemen
private final Container _container;
private Principal _user;
- private static final short DEFAULT_CHANNEL_MAX = 255;
+ private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
private ConnectionState _state = ConnectionState.UNOPENED;
- private short _channelMax;
+ private short _channelMax = DEFAULT_CHANNEL_MAX;
private int _maxFrameSize = 4096;
private String _remoteContainerId;
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
- private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+ private SessionEndpoint[] _sendingSessions;
// positioned by the *incoming* channel
- private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+ private SessionEndpoint[] _receivingSessions;
private boolean _closedForInput;
private boolean _closedForOutput;
@@ -165,7 +165,7 @@ public class ConnectionEndpoint implemen
}
if (_state == ConnectionState.UNOPENED)
{
- sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME);
+ sendOpen(_channelMax, DEFAULT_MAX_FRAME);
_state = ConnectionState.AWAITING_OPEN;
}
}
@@ -183,10 +183,10 @@ public class ConnectionEndpoint implemen
public synchronized SessionEndpoint createSession(String name)
{
// todo assert connection state
- SessionEndpoint endpoint = new SessionEndpoint(this);
short channel = getFirstFreeChannel();
if (channel != -1)
{
+ SessionEndpoint endpoint = new SessionEndpoint(this);
_sendingSessions[channel] = endpoint;
endpoint.setSendingChannel(channel);
Begin begin = new Begin();
@@ -196,13 +196,14 @@ public class ConnectionEndpoint implemen
begin.setHandleMax(_handleMax);
send(channel, begin);
+ return endpoint;
}
else
{
- // todo error
+ // TODO - report error
+ return null;
}
- return endpoint;
}
@@ -235,7 +236,16 @@ public class ConnectionEndpoint implemen
{
Open open = new Open();
- open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX));
+ if(_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[channelMax+1];
+ _sendingSessions = new SessionEndpoint[channelMax+1];
+ }
+ if(channelMax < _channelMax)
+ {
+ _channelMax = channelMax;
+ }
+ open.setChannelMax(UnsignedShort.valueOf(channelMax));
open.setContainerId(_container.getId());
open.setMaxFrameSize(getDesiredMaxFrameSize());
open.setHostname(getRemoteHostname());
@@ -268,7 +278,7 @@ public class ConnectionEndpoint implemen
short getFirstFreeChannel()
{
- for (int i = 0; i < _sendingSessions.length; i++)
+ for (int i = 0; i <= _channelMax; i++)
{
if (_sendingSessions[i] == null)
{
@@ -288,10 +298,16 @@ public class ConnectionEndpoint implemen
public synchronized void receiveOpen(short channel, Open open)
{
- _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX
- ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().shortValue();
+ _channelMax = open.getChannelMax() == null ? _channelMax
+ : open.getChannelMax().shortValue() < _channelMax
+ ? open.getChannelMax().shortValue()
+ : _channelMax;
+
+ if(_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[_channelMax+1];
+ _sendingSessions = new SessionEndpoint[_channelMax+1];
+ }
UnsignedInteger remoteDesiredMaxFrameSize =
open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
@@ -380,13 +396,30 @@ public class ConnectionEndpoint implemen
if (!_closedForInput)
{
_closedForInput = true;
- for (int i = 0; i < _receivingSessions.length; i++)
+ switch(_state)
+ {
+ case UNOPENED:
+ case AWAITING_OPEN:
+ case CLOSE_SENT:
+ _state = ConnectionState.CLOSED;
+ case OPEN:
+ _state = ConnectionState.CLOSE_RECEIVED;
+ case CLOSED:
+ // already sent our close - too late to do anything more
+ break;
+ default:
+ }
+
+ if(_receivingSessions != null)
{
- if (_receivingSessions[i] != null)
+ for (int i = 0; i < _receivingSessions.length; i++)
{
- _receivingSessions[i].end();
- _receivingSessions[i] = null;
+ if (_receivingSessions[i] != null)
+ {
+ _receivingSessions[i].end();
+ _receivingSessions[i] = null;
+ }
}
}
}
@@ -604,7 +637,6 @@ public class ConnectionEndpoint implemen
}
}
-
public void invalidHeaderReceived()
{
// TODO
@@ -984,4 +1016,9 @@ public class ConnectionEndpoint implemen
{
return _remoteError;
}
+
+ public void setChannelMax(final short channelMax)
+ {
+ _channelMax = channelMax;
+ }
}
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Mon Oct 21 22:04:51 2013
@@ -444,13 +444,25 @@ public abstract class LinkEndpoint<T ext
sendFlow(_flowTransactionId != null);
}
+ public void sendFlowWithEcho()
+ {
+ sendFlow(_flowTransactionId != null, true);
+ }
+
+
public void sendFlow(boolean setTransactionId)
{
+ sendFlow(setTransactionId, false);
+ }
+
+ public void sendFlow(boolean setTransactionId, boolean echo)
+ {
if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
{
Flow flow = new Flow();
flow.setLinkCredit(_linkCredit);
flow.setDeliveryCount(_deliveryCount);
+ flow.setEcho(echo);
_lastSentCreditLimit = _linkCredit.add(_deliveryCount);
flow.setAvailable(_available);
flow.setDrain(_drain);
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java Mon Oct 21 22:04:51 2013
@@ -288,7 +288,7 @@ public class ReceivingLinkEndpoint exten
setDrain(true);
_creditWindow = false;
_drainLimit = getDeliveryCount().add(getLinkCredit());
- sendFlow();
+ sendFlowWithEcho();
getLock().notifyAll();
}
}
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/codec/AMQPDescribedTypeRegistry.java Mon Oct 21 22:04:51 2013
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.codec.Un
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.RestrictedType;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.codec.*;
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java Mon Oct 21 22:04:51 2013
@@ -152,7 +152,7 @@ public class Source
return _outcomes;
}
- public void setOutcomes(Symbol[] outcomes)
+ public void setOutcomes(Symbol... outcomes)
{
_outcomes = outcomes;
}
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java Mon Oct 21 22:04:51 2013
@@ -34,9 +34,10 @@ import java.util.List;
public class AcceptedConstructor extends DescribedTypeConstructor<Accepted>
{
+ public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:accepted:list");
private static final Object[] DESCRIPTORS =
{
- Symbol.valueOf("amqp:accepted:list"),UnsignedLong.valueOf(0x0000000000000024L),
+ SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000024L),
};
private static final AcceptedConstructor INSTANCE = new AcceptedConstructor();
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java Mon Oct 21 22:04:51 2013
@@ -34,9 +34,10 @@ import java.util.List;
public class RejectedConstructor extends DescribedTypeConstructor<Rejected>
{
+ public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:rejected:list");
private static final Object[] DESCRIPTORS =
{
- Symbol.valueOf("amqp:rejected:list"),UnsignedLong.valueOf(0x0000000000000025L),
+ SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000025L),
};
private static final RejectedConstructor INSTANCE = new RejectedConstructor();
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java Mon Oct 21 22:04:51 2013
@@ -231,7 +231,7 @@ public class SourceConstructor extends D
try
{
- obj.setDistributionMode( (DistributionMode) val );
+ obj.setDistributionMode( StdDistMode.valueOf(val) );
}
catch(ClassCastException e)
{
@@ -326,7 +326,7 @@ public class SourceConstructor extends D
// TODO Error
}
}
-
+
}
@@ -360,7 +360,7 @@ public class SourceConstructor extends D
// TODO Error
}
}
-
+
}
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java Mon Oct 21 22:04:51 2013
@@ -43,6 +43,8 @@ public class ConnectionError
public static final ConnectionError REDIRECT = new ConnectionError(Symbol.valueOf("amqp:connection:redirect"));
+ public static final ConnectionError SOCKET_ERROR = new ConnectionError(Symbol.valueOf("amqp:connection:socket-error"));
+
private ConnectionError(Symbol val)
@@ -73,6 +75,11 @@ public class ConnectionError
return "redirect";
}
+ if(this == SOCKET_ERROR)
+ {
+ return "socket-error";
+ }
+
else
{
return String.valueOf(_val);
@@ -97,6 +104,11 @@ public class ConnectionError
{
return REDIRECT;
}
+
+ if(SOCKET_ERROR._val.equals(val))
+ {
+ return SOCKET_ERROR;
+ }
// TODO ERROR
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org