You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/11/12 09:10:34 UTC

[1/3] git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4719

Updated Branches:
  refs/heads/trunk bc9751ac2 -> d2ddd1dca


Fix for https://issues.apache.org/jira/browse/AMQ-4719


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7e000d5a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7e000d5a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7e000d5a

Branch: refs/heads/trunk
Commit: 7e000d5a40f8bbb817dec42a8f9fd03de0f972d7
Parents: bc9751a
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Nov 12 08:07:33 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Nov 12 08:07:33 2013 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/transport/TransportServer.java |  7 +++++++
 .../activemq/transport/TransportServerFilter.java      |  4 ++++
 .../activemq/transport/tcp/TcpTransportServer.java     | 13 ++++++++++++-
 .../activemq/transport/udp/UdpTransportServer.java     | 10 ++++++++++
 4 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
index 27b8572..fb25f4f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java
@@ -65,4 +65,11 @@ public interface TransportServer extends Service {
      *          connections.
      */
     boolean isSslServer();
+
+    /**
+     * Some protocols allow link stealing by default (if 2 connections have the same clientID - the youngest wins).
+     * This is the default for AMQP and MQTT. However, JMS 1.1 spec requires the opposite
+     * @return
+     */
+    boolean isAllowLinkStealing();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
index 2a06a57..e308774 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java
@@ -59,4 +59,8 @@ public class TransportServerFilter implements TransportServer {
     public boolean isSslServer() {
         return next.isSslServer();
     }
+
+    public boolean isAllowLinkStealing() {
+        return next.isAllowLinkStealing();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 310e9eb..5e1426a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -72,6 +72,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
     protected long maxInactivityDurationInitalDelay = 10000;
     protected int minmumWireFormatVersion;
     protected boolean useQueueForAccept = true;
+    protected boolean allowLinkStealing;
+
 
     /**
      * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
@@ -343,7 +345,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
 
     /**
      * @param socket
-     * @param inetAddress
+     * @param bindAddress
      * @return real hostName
      * @throws UnknownHostException
      */
@@ -511,4 +513,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
     public boolean isSslServer() {
         return false;
     }
+
+    @Override
+    public boolean isAllowLinkStealing() {
+        return allowLinkStealing;
+    }
+
+    public void setAllowLinkStealing(boolean allowLinkStealing) {
+        this.allowLinkStealing = allowLinkStealing;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7e000d5a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
index 79b140a..ccf7abe 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
@@ -52,6 +52,7 @@ public class UdpTransportServer extends TransportServerSupport {
     private final Transport configuredTransport;
     private boolean usingWireFormatNegotiation;
     private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
+    private boolean allowLinkStealing;
 
     public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
         super(connectURI);
@@ -189,4 +190,13 @@ public class UdpTransportServer extends TransportServerSupport {
     public boolean isSslServer() {
         return false;
     }
+
+    @Override
+    public boolean isAllowLinkStealing() {
+        return allowLinkStealing;
+    }
+
+    public void setAllowLinkStealing(boolean allowLinkStealing) {
+        this.allowLinkStealing = allowLinkStealing;
+    }
 }


[3/3] git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4719

Posted by ra...@apache.org.
Fix for https://issues.apache.org/jira/browse/AMQ-4719


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d2ddd1dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d2ddd1dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d2ddd1dc

Branch: refs/heads/trunk
Commit: d2ddd1dcadffc6f16901f634a6b04314f48f9d8d
Parents: 47d1985
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Nov 12 08:08:42 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Nov 12 08:08:42 2013 +0000

----------------------------------------------------------------------
 .../transport/mqtt/MQTTNIOSSLTransportFactory.java     |  4 +++-
 .../transport/mqtt/MQTTNIOTransportFactory.java        |  4 +++-
 .../activemq/transport/mqtt/MQTTTransportFactory.java  | 13 +++++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d2ddd1dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
index b9dfaba..f13b537 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
@@ -38,7 +38,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
 
     @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-        return new TcpTransportServer(this, location, serverSocketFactory) {
+        TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
                 if (context != null) {
@@ -47,6 +47,8 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
                 return transport;
             }
         };
+        result.setAllowLinkStealing(true);
+        return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/d2ddd1dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index f18e900..52fa228 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -49,11 +49,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
     }
 
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-        return new TcpTransportServer(this, location, serverSocketFactory) {
+        TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 return new MQTTNIOTransport(format, socket);
             }
         };
+        result.setAllowLinkStealing(true);
+        return result;
     }
 
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d2ddd1dc/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
index de50cf2..7b4696a 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -25,9 +29,12 @@ import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
+import javax.net.ServerSocketFactory;
+
 /**
  * A <a href="http://mqtt.org/">MQTT</a> transport factory
  */
@@ -39,6 +46,12 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
         return "mqtt";
     }
 
+    protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        TcpTransportServer result =  new TcpTransportServer(this, location, serverSocketFactory);
+        result.setAllowLinkStealing(true);
+        return result;
+    }
+
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         transport = new MQTTTransportFilter(transport, format, brokerContext);


[2/3] git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4719

Posted by ra...@apache.org.
Fix for https://issues.apache.org/jira/browse/AMQ-4719


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/47d19851
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/47d19851
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/47d19851

Branch: refs/heads/trunk
Commit: 47d198513915532e1e8a3bcb0f9a6f98f5a794dd
Parents: 7e000d5
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Nov 12 08:08:07 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Nov 12 08:08:07 2013 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/broker/TransportConnector.java    |  4 ++++
 .../apache/activemq/transport/vm/VMTransportServer.java   | 10 ++++++++++
 2 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/47d19851/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 4e31ee0..582bc3f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -90,6 +90,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
                 setEnableStatusMonitor(false);
             }
         }
+        if (server != null){
+            setAllowLinkStealing(server.isAllowLinkStealing());
+        }
     }
 
     /**
@@ -123,6 +126,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
         rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
         rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
         rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
+        rc.setAllowLinkStealing(isAllowLinkStealing());
         return rc;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/47d19851/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index b3dd21d..2f3d519 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -39,6 +39,7 @@ public class VMTransportServer implements TransportServer {
 
     private final AtomicInteger connectionCount = new AtomicInteger(0);
     private final boolean disposeOnDisconnect;
+    private boolean allowLinkStealing;
 
     /**
      * @param location
@@ -142,4 +143,13 @@ public class VMTransportServer implements TransportServer {
     public boolean isSslServer() {
         return false;
     }
+
+    @Override
+    public boolean isAllowLinkStealing() {
+        return allowLinkStealing;
+    }
+
+    public void setAllowLinkStealing(boolean allowLinkStealing) {
+        this.allowLinkStealing = allowLinkStealing;
+    }
 }