You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/11/12 09:10:34 UTC
[1/3] git commit: Fix for
https://issues.apache.org/jira/browse/AMQ-4719
Updated Branches:
refs/heads/trunk bc9751ac2 -> d2ddd1dca
Fix for https://issues.apache.org/jira/browse/AMQ-4719
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7e000d5a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7e000d5a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7e000d5a
Branch: refs/heads/trunk
Commit: 7e000d5a40f8bbb817dec42a8f9fd03de0f972d7
Parents: bc9751a
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Nov 12 08:07:33 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Nov 12 08:07:33 2013 +0000
----------------------------------------------------------------------
.../org/apache/activemq/transport/TransportServer.java | 7 +++++++
.../activemq/transport/TransportServerFilter.java | 4 ++++
.../activemq/transport/tcp/TcpTransportServer.java | 13 ++++++++++++-
.../activemq/transport/udp/UdpTransportServer.java | 10 ++++++++++
4 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
index 27b8572..fb25f4f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
@@ -65,4 +65,11 @@ public interface TransportServer extends Service {
* connections.
*/
boolean isSslServer();
+
+ /**
+ * Some protocols allow link stealing by default (if 2 connections have the same clientID - the youngest wins).
+ * This is the default for AMQP and MQTT. However, JMS 1.1 spec requires the opposite
+ * @return
+ */
+ boolean isAllowLinkStealing();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
index 2a06a57..e308774 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
@@ -59,4 +59,8 @@ public class TransportServerFilter implements TransportServer {
public boolean isSslServer() {
return next.isSslServer();
}
+
+ public boolean isAllowLinkStealing() {
+ return next.isAllowLinkStealing();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 310e9eb..5e1426a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -72,6 +72,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
protected long maxInactivityDurationInitalDelay = 10000;
protected int minmumWireFormatVersion;
protected boolean useQueueForAccept = true;
+ protected boolean allowLinkStealing;
+
/**
* trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
@@ -343,7 +345,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
/**
* @param socket
- * @param inetAddress
+ * @param bindAddress
* @return real hostName
* @throws UnknownHostException
*/
@@ -511,4 +513,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
public boolean isSslServer() {
return false;
}
+
+ @Override
+ public boolean isAllowLinkStealing() {
+ return allowLinkStealing;
+ }
+
+ public void setAllowLinkStealing(boolean allowLinkStealing) {
+ this.allowLinkStealing = allowLinkStealing;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
index 79b140a..ccf7abe 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
@@ -52,6 +52,7 @@ public class UdpTransportServer extends TransportServerSupport {
private final Transport configuredTransport;
private boolean usingWireFormatNegotiation;
private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
+ private boolean allowLinkStealing;
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
super(connectURI);
@@ -189,4 +190,13 @@ public class UdpTransportServer extends TransportServerSupport {
public boolean isSslServer() {
return false;
}
+
+ @Override
+ public boolean isAllowLinkStealing() {
+ return allowLinkStealing;
+ }
+
+ public void setAllowLinkStealing(boolean allowLinkStealing) {
+ this.allowLinkStealing = allowLinkStealing;
+ }
}
[3/3] git commit: Fix for
https://issues.apache.org/jira/browse/AMQ-4719
Posted by ra...@apache.org.
Fix for https://issues.apache.org/jira/browse/AMQ-4719
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d2ddd1dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d2ddd1dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d2ddd1dc
Branch: refs/heads/trunk
Commit: d2ddd1dcadffc6f16901f634a6b04314f48f9d8d
Parents: 47d1985
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Nov 12 08:08:42 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Nov 12 08:08:42 2013 +0000
----------------------------------------------------------------------
.../transport/mqtt/MQTTNIOSSLTransportFactory.java | 4 +++-
.../transport/mqtt/MQTTNIOTransportFactory.java | 4 +++-
.../activemq/transport/mqtt/MQTTTransportFactory.java | 13 +++++++++++++
3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/d2ddd1dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
index b9dfaba..f13b537 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
@@ -38,7 +38,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- return new TcpTransportServer(this, location, serverSocketFactory) {
+ TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
if (context != null) {
@@ -47,6 +47,8 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
return transport;
}
};
+ result.setAllowLinkStealing(true);
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/d2ddd1dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index f18e900..52fa228 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -49,11 +49,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
}
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- return new TcpTransportServer(this, location, serverSocketFactory) {
+ TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new MQTTNIOTransport(format, socket);
}
};
+ result.setAllowLinkStealing(true);
+ return result;
}
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/d2ddd1dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
index de50cf2..7b4696a 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.transport.mqtt;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
@@ -25,9 +29,12 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
+import javax.net.ServerSocketFactory;
+
/**
* A <a href="http://mqtt.org/">MQTT</a> transport factory
*/
@@ -39,6 +46,12 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
return "mqtt";
}
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
+ result.setAllowLinkStealing(true);
+ return result;
+ }
+
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerContext);
[2/3] git commit: Fix for
https://issues.apache.org/jira/browse/AMQ-4719
Posted by ra...@apache.org.
Fix for https://issues.apache.org/jira/browse/AMQ-4719
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/47d19851
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/47d19851
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/47d19851
Branch: refs/heads/trunk
Commit: 47d198513915532e1e8a3bcb0f9a6f98f5a794dd
Parents: 7e000d5
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Nov 12 08:08:07 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Nov 12 08:08:07 2013 +0000
----------------------------------------------------------------------
.../org/apache/activemq/broker/TransportConnector.java | 4 ++++
.../apache/activemq/transport/vm/VMTransportServer.java | 10 ++++++++++
2 files changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/47d19851/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 4e31ee0..582bc3f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -90,6 +90,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
setEnableStatusMonitor(false);
}
}
+ if (server != null){
+ setAllowLinkStealing(server.isAllowLinkStealing());
+ }
}
/**
@@ -123,6 +126,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
+ rc.setAllowLinkStealing(isAllowLinkStealing());
return rc;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/47d19851/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index b3dd21d..2f3d519 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -39,6 +39,7 @@ public class VMTransportServer implements TransportServer {
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final boolean disposeOnDisconnect;
+ private boolean allowLinkStealing;
/**
* @param location
@@ -142,4 +143,13 @@ public class VMTransportServer implements TransportServer {
public boolean isSslServer() {
return false;
}
+
+ @Override
+ public boolean isAllowLinkStealing() {
+ return allowLinkStealing;
+ }
+
+ public void setAllowLinkStealing(boolean allowLinkStealing) {
+ this.allowLinkStealing = allowLinkStealing;
+ }
}