You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/14 15:46:38 UTC
svn commit: r1558056 [2/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java:
./ amqp-1-0-client-jms/
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-client-websocket/ amqp-1-0-client-websocket/resources/
amqp-1-0-client/ a...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java Tue Jan 14 14:46:35 2014
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.codec.De
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
@@ -39,10 +40,15 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
import org.apache.qpid.amqp_1_0.type.transport.Error;
public class Sender implements DeliveryStateHandler
{
+ private static final long UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER = 1000l;
+ private static final long DEFAULT_CREDIT_TIMEOUT = 30000l;
+
private SendingLinkEndpoint _endpoint;
private int _id;
private Session _session;
@@ -150,17 +156,26 @@ public class Sender implements DeliveryS
synchronized(_endpoint.getLock())
{
- while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+ try
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
+ _endpoint.waitUntil(new Predicate()
+ {
+ @Override
+ public boolean isSatisfied()
+ {
+ return _endpoint.isAttached() || _endpoint.isDetached();
+ }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new SenderCreationException(e);
}
+ catch (InterruptedException e)
+ {
+ throw new SenderCreationException(e);
+ }
+
if (session.getEndpoint().isEnded())
{
throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress());
@@ -225,22 +240,22 @@ public class Sender implements DeliveryS
return _endpoint.getTarget();
}
- public void send(Message message) throws LinkDetachedException
+ public void send(Message message) throws LinkDetachedException, TimeoutException
{
send(message, null, null);
}
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException, TimeoutException
{
send(message, null, action);
}
- public void send(Message message, final Transaction txn) throws LinkDetachedException
+ public void send(Message message, final Transaction txn) throws LinkDetachedException, TimeoutException
{
send(message, txn, null);
}
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException, TimeoutException
{
List<Section> sections = message.getPayload();
@@ -290,19 +305,26 @@ public class Sender implements DeliveryS
xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
}
final Object lock = _endpoint.getLock();
+
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+
+ try
{
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
+ _endpoint.waitUntil(new Predicate()
+ {
+ @Override
+ public boolean isSatisfied()
+ {
+ return _endpoint.hasCreditToSend() || _endpoint.isDetached();
+ }
+ }, getCreditTimeout());
}
+ catch (InterruptedException e)
+ {
+ throw new TimeoutException("Interrupted while waiting for credit");
+ }
+
if(_endpoint.isDetached())
{
throw new LinkDetachedException(_error);
@@ -312,27 +334,24 @@ public class Sender implements DeliveryS
_outcomeActions.put(message.getDeliveryTag(), action);
}
_endpoint.transfer(xfr);
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
}
if(_windowSize != 0)
{
- synchronized(lock)
+ try
{
-
-
- while(_endpoint.getUnsettledCount() >= _windowSize)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
+ _endpoint.waitUntil(new Predicate()
+ {
+ @Override
+ public boolean isSatisfied()
+ {
+ return _endpoint.getUnsettledCount() < _windowSize;
+ }
+ }, getUnsettledTimeout());
+ }
+ catch (InterruptedException e)
+ {
+ throw new TimeoutException("Interrupted while waiting for the window to expand to allow sending");
}
}
@@ -340,48 +359,80 @@ public class Sender implements DeliveryS
}
+ private long getCreditTimeout()
+ {
+ return _endpoint.getSyncTimeout() < DEFAULT_CREDIT_TIMEOUT ? DEFAULT_CREDIT_TIMEOUT : _endpoint.getSyncTimeout();
+ }
+
public void close() throws SenderClosingException
{
+ boolean unsettledDeliveries = false;
if(_windowSize != 0)
{
- synchronized(_endpoint.getLock())
- {
-
+ long timeout = getUnsettledTimeout();
- while(_endpoint.getUnsettledCount() > 0)
+ try
+ {
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _endpoint.getUnsettledCount() == 0;
}
- }
+ }, timeout);
+ }
+ catch (InterruptedException e)
+ {
+ unsettledDeliveries = true;
+ }
+ catch (TimeoutException e)
+ {
+ unsettledDeliveries = true;
}
}
_session.removeSender(this);
_endpoint.setSource(null);
- _endpoint.detach();
+ _endpoint.close();
_closed = true;
- synchronized(_endpoint.getLock())
+ try
{
- while(!_endpoint.isDetached())
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+ @Override
+ public boolean isSatisfied()
{
- throw new SenderClosingException(e);
+ return _endpoint.isDetached();
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new SenderClosingException("Timed out attempting to detach link", e);
}
+ catch (InterruptedException e)
+ {
+ throw new SenderClosingException("Interrupted while attempting to detach link", e);
+ }
+ if(unsettledDeliveries && _endpoint.getUnsettledCount() > 0)
+ {
+ throw new SenderClosingException("Some messages may not have been received by the recipient");
+ }
+ }
+
+ private long getUnsettledTimeout()
+ {
+ long timeout = _endpoint.getSyncTimeout();
+
+ // give a generous timeout where there are unsettled messages
+ if(timeout < _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER)
+ {
+ timeout = _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER;
+ }
+ return timeout;
}
public boolean isClosed()
@@ -468,10 +519,20 @@ public class Sender implements DeliveryS
public class SenderClosingException extends Exception
{
+ public SenderClosingException(final String message, final Throwable cause)
+ {
+ super(message, cause);
+ }
+
public SenderClosingException(Throwable e)
{
super(e);
}
+
+ public SenderClosingException(final String message)
+ {
+ super(message);
+ }
}
public static interface OutcomeAction
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-common:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java Tue Jan 14 14:46:35 2014
@@ -66,20 +66,7 @@ public class SymbolTypeConstructor exten
if(symbolVal == null)
{
ByteBuffer dup = in.duplicate();
- try
- {
- dup.limit(in.position()+size);
- }
- catch (IllegalArgumentException e)
- {
- System.err.println("in.position(): " + in.position());
- System.err.println("size: " + size);
- System.err.println("dup.position(): " + dup.position());
- System.err.println("dup.capacity(): " + dup.capacity());
- System.err.println("dup.limit(): " + dup.limit());
- throw e;
-
- }
+ dup.limit(in.position()+size);
CharBuffer charBuf = ASCII.decode(dup);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Tue Jan 14 14:46:35 2014
@@ -65,6 +65,12 @@ public class ConnectionHandler
public boolean parse(ByteBuffer in)
{
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ Binary b = new Binary(in.array(),in.arrayOffset()+in.position(),in.remaining());
+ RAW_LOGGER.fine("RECV [" + _connection.getRemoteAddress() + "] : " + b.toString());
+ }
+
while(in.hasRemaining() && !isDone())
{
_delegate = _delegate.parse(in);
@@ -376,6 +382,47 @@ public class ConnectionHandler
}
+ public static class SequentialFrameSource implements FrameSource
+ {
+ private Queue<FrameSource> _sources = new LinkedList<FrameSource>();
+
+ public SequentialFrameSource(FrameSource... sources)
+ {
+ _sources.addAll(Arrays.asList(sources));
+ }
+
+ public synchronized void addSource(FrameSource source)
+ {
+ _sources.add(source);
+ }
+
+ @Override
+ public synchronized AMQFrame getNextFrame(final boolean wait)
+ {
+ FrameSource src = _sources.peek();
+ while (src != null && src.closed())
+ {
+ _sources.poll();
+ src = _sources.peek();
+ }
+
+ if(src != null)
+ {
+ return src.getNextFrame(wait);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean closed()
+ {
+ return _sources.isEmpty();
+ }
+ }
+
+
public static class BytesOutputHandler implements Runnable, BytesProcessor
{
@@ -383,28 +430,28 @@ public class ConnectionHandler
private BytesSource _bytesSource;
private boolean _closed;
private ConnectionEndpoint _conn;
- private SocketExceptionHandler _exceptionHandler;
-
- public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
- {
- _outputStream = outputStream;
- _bytesSource = source;
- _conn = conn;
- _exceptionHandler = exceptionHandler;
- }
+ private ExceptionHandler _exceptionHandler;
- public void run()
- {
+ public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, ExceptionHandler exceptionHandler)
+ {
+ _outputStream = outputStream;
+ _bytesSource = source;
+ _conn = conn;
+ _exceptionHandler = exceptionHandler;
+ }
- final BytesSource bytesSource = _bytesSource;
+ public void run()
+ {
- while(!(_closed || bytesSource.closed()))
- {
- _bytesSource.getBytes(this, true);
- }
+ final BytesSource bytesSource = _bytesSource;
+ while(!(_closed || bytesSource.closed()))
+ {
+ _bytesSource.getBytes(this, true);
}
+ }
+
public void processBytes(final ByteBuffer buf)
{
try
@@ -423,7 +470,7 @@ public class ConnectionHandler
catch (IOException e)
{
_closed = true;
- _exceptionHandler.processSocketException(e);
+ _exceptionHandler.handleException(e);
}
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Tue Jan 14 14:46:35 2014
@@ -127,9 +127,9 @@ public class FrameHandler implements Pro
break;
}
- else if(size > _connection.getMaxFrameSize())
+ else if(size > _connection.getDesiredMaxFrameSize().intValue())
{
- frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getMaxFrameSize());
+ frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getDesiredMaxFrameSize().intValue());
state = State.ERROR;
break;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Tue Jan 14 14:46:35 2014
@@ -40,10 +40,8 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslServerFactory;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -51,7 +49,7 @@ import java.nio.charset.Charset;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Enumeration;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -71,6 +69,7 @@ public class ConnectionEndpoint implemen
private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
+ private static final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqp.connection_sync_timeout",5000l);
private ConnectionState _state = ConnectionState.UNOPENED;
@@ -104,7 +103,7 @@ public class ConnectionEndpoint implemen
private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
private ConnectionEventListener _connectionEventListener = ConnectionEventListener.DEFAULT;
private String _password;
- private final boolean _requiresSASLClient;
+ private boolean _requiresSASLClient;
private final boolean _requiresSASLServer;
@@ -122,6 +121,7 @@ public class ConnectionEndpoint implemen
private Error _remoteError;
private Map _properties;
+ private long _syncTimeout = DEFAULT_SYNC_TIMEOUT;
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
@@ -140,6 +140,14 @@ public class ConnectionEndpoint implemen
_requiresSASLServer = false;
}
+ public void setPrincipal(Principal user)
+ {
+ if(_user == null)
+ {
+ _user = user;
+ _requiresSASLClient = user != null;
+ }
+ }
public synchronized void open()
{
@@ -1054,4 +1062,42 @@ public class ConnectionEndpoint implemen
{
_channelMax = channelMax;
}
+
+ public long getSyncTimeout()
+ {
+ return _syncTimeout;
+ }
+
+ public void setSyncTimeout(final long syncTimeout)
+ {
+ _syncTimeout = syncTimeout;
+ }
+
+ public void waitUntil(Predicate predicate) throws InterruptedException, TimeoutException
+ {
+ waitUntil(predicate, _syncTimeout);
+ }
+
+ public void waitUntil(Predicate predicate, long timeout) throws InterruptedException, TimeoutException
+ {
+ long endTime = System.currentTimeMillis() + timeout;
+
+ synchronized(getLock())
+ {
+ while(!predicate.isSatisfied())
+ {
+ getLock().wait(timeout);
+
+ if(!predicate.isSatisfied())
+ {
+ timeout = endTime - System.currentTimeMillis();
+ if(timeout <= 0l)
+ {
+ throw new TimeoutException();
+ }
+ }
+ }
+ }
+
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Tue Jan 14 14:46:35 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
public abstract class LinkEndpoint<T extends LinkEventListener>
{
@@ -324,6 +325,23 @@ public abstract class LinkEndpoint<T ext
return _session.getLock();
}
+
+ public long getSyncTimeout()
+ {
+ return _session.getSyncTimeout();
+ }
+
+ public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
+ {
+ _session.waitUntil(predicate);
+ }
+
+ public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
+ {
+ _session.waitUntil(predicate, timeout);
+ }
+
+
public void attach()
{
synchronized(getLock())
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Tue Jan 14 14:46:35 2014
@@ -35,6 +35,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeoutException;
public class SessionEndpoint
{
@@ -579,19 +580,7 @@ public class SessionEndpoint
if(payload != null && payloadSent < payload.remaining())
{
payload = payload.duplicate();
-try
-{
payload.position(payload.position()+payloadSent);
-}
-catch(IllegalArgumentException e)
-{
- System.err.println("UNEXPECTED");
- System.err.println("Payload Position: " + payload.position());
- System.err.println("Payload Sent: " + payloadSent);
- System.err.println("Payload Remaining: " + payload.remaining());
- throw e;
-
-}
Transfer secondTransfer = new Transfer();
@@ -618,6 +607,23 @@ catch(IllegalArgumentException e)
return _connection.getLock();
}
+
+ public long getSyncTimeout()
+ {
+ return _connection.getSyncTimeout();
+ }
+
+ public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
+ {
+ _connection.waitUntil(predicate);
+ }
+
+ public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
+ {
+ _connection.waitUntil(predicate, timeout);
+ }
+
+
public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
String targetAddr,
String sourceAddr,
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java Tue Jan 14 14:46:35 2014
@@ -31,8 +31,7 @@ import java.util.Map;
import org.apache.qpid.amqp_1_0.type.*;
public class Error
- {
-
+{
private ErrorCondition _condition;
@@ -40,6 +39,16 @@ public class Error
private Map _info;
+ public Error()
+ {
+ }
+
+ public Error(final ErrorCondition condition, final String description)
+ {
+ _condition = condition;
+ _description = description;
+ }
+
public ErrorCondition getCondition()
{
return _condition;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Jan 14 14:46:35 2014
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
@@ -84,6 +85,8 @@ public class BDBMessageStore implements
private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, MESSAGE_META_DATA_DB_NAME,
MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME };
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
private EnvironmentFacade _environmentFacade;
private final AtomicLong _messageId = new AtomicLong(0);
@@ -282,16 +285,19 @@ public class BDBMessageStore implements
@Override
public void close() throws AMQStoreException
{
- _stateManager.attainState(State.CLOSING);
- try
- {
- closeEnvironment();
- }
- catch(DatabaseException e)
+ if (_closed.compareAndSet(false, true))
{
- throw new AMQStoreException("Exception occured on message store close", e);
+ _stateManager.attainState(State.CLOSING);
+ try
+ {
+ closeEnvironment();
+ }
+ catch(DatabaseException e)
+ {
+ throw new AMQStoreException("Exception occured on message store close", e);
+ }
+ _stateManager.attainState(State.CLOSED);
}
- _stateManager.attainState(State.CLOSED);
}
private void closeEnvironment()
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html Tue Jan 14 14:46:35 2014
@@ -1,3 +1,19 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
<table class="tableContainer-table tableContainer-table-horiz">
<tr>
<td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html Tue Jan 14 14:46:35 2014
@@ -1,3 +1,19 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
<table class="tableContainer-table tableContainer-table-horiz">
<tr>
<td class="tableContainer-labelCell" style="width: 300px;"><strong>Path to store location*: </strong></td>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java Tue Jan 14 14:46:35 2014
@@ -215,12 +215,10 @@ public class HAClusterManagementTest ext
catch(RuntimeException rte)
{
//check cause was BDBs EnvironmentFailureException
- boolean isExpectedException = rte.getMessage().contains(EnvironmentFailureException.class.getName());
- if (!isExpectedException)
- {
- rte.printStackTrace();
- }
- assertTrue("Unexpected exception message:" + rte.getMessage(), isExpectedException);
+ assertTrue("Message '"+rte.getMessage()+"' does not contain '"
+ + EnvironmentFailureException.class.getName()
+ + "'.",
+ rte.getMessage().contains(EnvironmentFailureException.class.getName()));
// PASS
}
}
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-core:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Tue Jan 14 14:46:35 2014
@@ -29,8 +29,6 @@ import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.ConfigurationEntryStore;
import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -125,6 +123,7 @@ public class Broker
}
catch(Exception e)
{
+ LOGGER.fatal("Exception during startup", e);
try
{
_applicationRegistry.close();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Jan 14 14:46:35 2014
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Tue Jan 14 14:46:35 2014
@@ -31,7 +31,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Tue Jan 14 14:46:35 2014
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.plugin.QpidServiceLoader;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Tue Jan 14 14:46:35 2014
@@ -26,11 +26,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.plugin.ExchangeType;
import java.util.Collection;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import java.util.Collection;
import java.util.UUID;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Jan 14 14:46:35 2014
@@ -22,17 +22,12 @@ package org.apache.qpid.server.exchange;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
@@ -41,7 +36,6 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.BaseQueue;
import java.util.ArrayList;
-import java.util.concurrent.ConcurrentHashMap;
public class FanoutExchange extends AbstractExchange
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Tue Jan 14 14:46:35 2014
@@ -30,7 +30,6 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Jan 14 14:46:35 2014
@@ -23,8 +23,6 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.framing.AMQTypedValue;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jan 14 14:46:35 2014
@@ -22,10 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue Jan 14 14:46:35 2014
@@ -29,8 +29,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java Tue Jan 14 14:46:35 2014
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Jan 14 14:46:35 2014
@@ -28,7 +28,6 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.framing.FieldTable;
import java.util.Map;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Tue Jan 14 14:46:35 2014
@@ -79,6 +79,7 @@ public interface Connection extends Conf
public static final String REMOTE_PROCESS_PID = "remoteProcessPid";
public static final String SESSION_COUNT_LIMIT = "sessionCountLimit";
public static final String TRANSPORT = "transport";
+ /** Name of port associated with the connection */
public static final String PORT = "port";
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java Tue Jan 14 14:46:35 2014
@@ -25,7 +25,10 @@ import java.util.EnumSet;
public enum Transport
{
TCP,
- SSL;
+ SSL,
+ WS,
+ WSS,
+ SCTP;
public static Transport valueOfObject(Object transportObject)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java Tue Jan 14 14:46:35 2014
@@ -19,12 +19,8 @@
*/
package org.apache.qpid.server.model.adapter;
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -42,21 +38,21 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.KeyStore;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.TransportProvider;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
public class AmqpPortAdapter extends PortAdapter
{
private final Broker _broker;
- private IncomingNetworkTransport _transport;
+ private AcceptingTransport _transport;
public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor)
{
@@ -70,42 +66,36 @@ public class AmqpPortAdapter extends Por
Collection<Transport> transports = getTransports();
Set<AmqpProtocolVersion> supported = convertFromModelProtocolsToAmqp(getProtocols());
- SSLContext sslContext = null;
- if (transports.contains(Transport.SSL))
+ TransportProvider transportProvider = null;
+ final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
+ for(TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class))
{
- sslContext = createSslContext();
+ if(tpf.getSupportedTransports().contains(transports))
+ {
+ transportProvider = tpf.getTransportProvider(transportSet);
+ }
}
- AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
-
- String bindingAddress = (String) getAttribute(Port.BINDING_ADDRESS);
- if (WILDCARD_ADDRESS.equals(bindingAddress))
+ if(transportProvider == null)
{
- bindingAddress = null;
+ throw new IllegalConfigurationException("No transport providers found which can satisfy the requirement to support the transports: " + transports);
}
- Integer port = (Integer) getAttribute(Port.PORT);
- InetSocketAddress bindingSocketAddress = null;
- if ( bindingAddress == null )
- {
- bindingSocketAddress = new InetSocketAddress(port);
- }
- else
+
+ SSLContext sslContext = null;
+ if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS))
{
- bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
+ sslContext = createSslContext();
}
- final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(
- bindingSocketAddress, (Boolean)getAttribute(TCP_NO_DELAY),
- (Integer)getAttribute(SEND_BUFFER_SIZE), (Integer)getAttribute(RECEIVE_BUFFER_SIZE),
- (Boolean)getAttribute(NEED_CLIENT_AUTH), (Boolean)getAttribute(WANT_CLIENT_AUTH));
-
- _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
- final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(
- _broker, transports.contains(Transport.TCP) ? sslContext : null,
- settings.wantClientAuth(), settings.needClientAuth(),
- supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
+ AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
- _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext);
+ _transport = transportProvider.createTransport(transportSet,
+ sslContext,
+ this,
+ supported,
+ defaultSupportedProtocolReply);
+
+ _transport.start();
for(Transport transport : getTransports())
{
CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
@@ -210,68 +200,4 @@ public class AmqpPortAdapter extends Por
}
return null;
}
-
- class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
- {
- private final InetSocketAddress _bindingSocketAddress;
- private final Boolean _tcpNoDelay;
- private final Integer _sendBufferSize;
- private final Integer _receiveBufferSize;
- private final boolean _needClientAuth;
- private final boolean _wantClientAuth;
-
- public ServerNetworkTransportConfiguration(
- InetSocketAddress bindingSocketAddress, boolean tcpNoDelay,
- int sendBufferSize, int receiveBufferSize,
- boolean needClientAuth, boolean wantClientAuth)
- {
- _bindingSocketAddress = bindingSocketAddress;
- _tcpNoDelay = tcpNoDelay;
- _sendBufferSize = sendBufferSize;
- _receiveBufferSize = receiveBufferSize;
- _needClientAuth = needClientAuth;
- _wantClientAuth = wantClientAuth;
- }
-
- @Override
- public boolean wantClientAuth()
- {
- return _wantClientAuth;
- }
-
- @Override
- public boolean needClientAuth()
- {
- return _needClientAuth;
- }
-
- @Override
- public Boolean getTcpNoDelay()
- {
- return _tcpNoDelay;
- }
-
- @Override
- public Integer getSendBufferSize()
- {
- return _sendBufferSize;
- }
-
- @Override
- public Integer getReceiveBufferSize()
- {
- return _receiveBufferSize;
- }
-
- @Override
- public InetSocketAddress getAddress()
- {
- return _bindingSocketAddress;
- }
- };
-
- public String toString()
- {
- return getName();
- }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Tue Jan 14 14:46:35 2014
@@ -35,6 +35,7 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
@@ -226,7 +227,8 @@ final class ConnectionAdapter extends Ab
}
else if(name.equals(PORT))
{
- return String.valueOf(_connection.getPort());
+ Port port = _connection.getPort();
+ return String.valueOf(port == null ? null : port.getName());
}
return super.getAttribute(name);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java Tue Jan 14 14:46:35 2014
@@ -556,4 +556,11 @@ public class PortAdapter extends Abstrac
return trustStores;
}
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " [id=" + getId() + ", name=" + getName() + ", port=" + getPort() + "]";
+ }
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java Tue Jan 14 14:46:35 2014
@@ -111,7 +111,7 @@ public class PortFactory
throw new IllegalConfigurationException("Can't create port which requests SSL client certificates but has no trust stores configured.");
}
- if(useClientAuth && !port.getTransports().contains(Transport.SSL))
+ if(useClientAuth && !(port.getTransports().contains(Transport.SSL) || port.getTransports().contains(Transport.WSS)))
{
throw new IllegalConfigurationException("Can't create port which requests SSL client certificates but doesn't use SSL transport.");
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java Tue Jan 14 14:46:35 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.plugin;
import java.util.UUID;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Tue Jan 14 14:46:35 2014
@@ -30,6 +30,8 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSocket;
+
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -38,6 +40,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.SSLStatus;
@@ -143,11 +146,6 @@ public class MultiVersionProtocolEngine
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection networkConnection)
- {
- setNetworkConnection(networkConnection, networkConnection.getSender());
- }
-
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
_network = network;
@@ -274,9 +272,9 @@ public class MultiVersionProtocolEngine
public void received(ByteBuffer msg)
{
-
_lastReadTime = System.currentTimeMillis();
- ByteBuffer msgheader = msg.duplicate();
+ ByteBuffer msgheader = msg.duplicate().slice();
+
if(_header.remaining() > msgheader.limit())
{
msg.position(msg.limit());
@@ -329,6 +327,7 @@ public class MultiVersionProtocolEngine
}
}
+
if(newDelegate == null && looksLikeSSL(headerBytes))
{
if(_sslContext != null)
@@ -475,7 +474,7 @@ public class MultiVersionProtocolEngine
SSLStatus sslStatus = new SSLStatus();
_sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
_sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
- _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender));
+ _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
}
@Override
@@ -592,6 +591,9 @@ public class MultiVersionProtocolEngine
private final NetworkConnection _network;
private final SSLBufferingSender _sslSender;
private final SSLEngine _engine;
+ private Principal _principal;
+ private boolean _principalChecked;
+ private final Object _lock = new Object();
public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
SSLBufferingSender sslSender)
@@ -647,21 +649,25 @@ public class MultiVersionProtocolEngine
}
@Override
- public void setPeerPrincipal(Principal principal)
- {
- _network.setPeerPrincipal(principal);
- }
-
- @Override
public Principal getPeerPrincipal()
{
- try
- {
- return _engine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
+ synchronized (_lock)
{
- return null;
+ if(!_principalChecked)
+ {
+ try
+ {
+ _principal = _engine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ _principal = null;
+ }
+
+ _principalChecked = true;
+ }
+
+ return _principal;
}
}
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +34,6 @@ import org.apache.qpid.server.virtualhos
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Set;
public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Tue Jan 14 14:46:35 2014
@@ -22,7 +22,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Tue Jan 14 14:46:35 2014
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InboundMessage;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jan 14 14:46:35 2014
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -734,7 +733,7 @@ public class SimpleAMQQueue implements A
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
- if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+ if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -755,10 +754,18 @@ public class SimpleAMQQueue implements A
private boolean assign(final Subscription sub, final QueueEntry entry)
{
- return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+ if(_messageGroupManager == null)
+ {
+ //no grouping, try to acquire immediately.
+ return entry.acquire(sub);
+ }
+ else
+ {
+ //the group manager is responsible for acquiring the message if/when appropriate
+ return _messageGroupManager.acceptMessage(sub, entry);
+ }
}
-
private boolean mightAssign(final Subscription sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
@@ -1646,7 +1653,7 @@ public class SimpleAMQQueue implements A
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
+ if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java Tue Jan 14 14:46:35 2014
@@ -37,6 +37,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManager;
/**
* Creates a {@link Subject} formed by the {@link Principal}'s returned from:
@@ -129,6 +130,17 @@ public class SubjectCreator
}
}
+ public Subject createSubjectWithGroups(Principal principal)
+ {
+ Subject authenticationSubject = new Subject();
+
+ authenticationSubject.getPrincipals().add(principal);
+ authenticationSubject.getPrincipals().addAll(getGroupPrincipals(principal.getName()));
+ authenticationSubject.setReadOnly();
+
+ return authenticationSubject;
+ }
+
public Subject createSubjectWithGroups(String username)
{
Subject authenticationSubject = new Subject();
@@ -159,4 +171,9 @@ public class SubjectCreator
{
return _authenticationManager instanceof AnonymousAuthenticationManager;
}
+
+ public boolean isExternalAuthenticationAllowed()
+ {
+ return _authenticationManager instanceof ExternalAuthenticationManager;
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Jan 14 14:46:35 2014
@@ -79,7 +79,7 @@ abstract public class AbstractJDBCMessag
private static final int DB_VERSION = 7;
private final AtomicLong _messageId = new AtomicLong(0);
- private AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
@@ -683,12 +683,14 @@ abstract public class AbstractJDBCMessag
@Override
public void close() throws AMQStoreException
{
- _closed.getAndSet(true);
- _stateManager.attainState(State.CLOSING);
+ if (_closed.compareAndSet(false, true))
+ {
+ _stateManager.attainState(State.CLOSING);
- doClose();
+ doClose();
- _stateManager.attainState(State.CLOSED);
+ _stateManager.attainState(State.CLOSED);
+ }
}
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Tue Jan 14 14:46:35 2014
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.framing.FieldTable;
-
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Tue Jan 14 14:46:35 2014
@@ -29,14 +29,11 @@ import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class DurableConfigurationStoreHelper
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java Tue Jan 14 14:46:35 2014
@@ -63,6 +63,18 @@ public class AssignedSubscriptionMessage
public boolean acceptMessage(Subscription sub, QueueEntry entry)
{
+ if(assignMessage(sub, entry))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean assignMessage(Subscription sub, QueueEntry entry)
+ {
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
if(groupVal == null)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Tue Jan 14 14:46:35 2014
@@ -136,9 +136,21 @@ public class DefinedGroupMessageGroupMan
public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
{
+ if(assignMessage(sub, entry))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+ {
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
-
+
if(group == null || !group.isValid())
{
group = new Group(groupId, sub);
@@ -152,11 +164,10 @@ public class DefinedGroupMessageGroupMan
{
return false;
}
-
}
-
+
Subscription assignedSub = group.getSubscription();
-
+
if(assignedSub == sub)
{
entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
@@ -167,8 +178,7 @@ public class DefinedGroupMessageGroupMan
return false;
}
}
-
-
+
public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
{
EntryFinder visitor = new EntryFinder(sub);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org