You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/25 05:04:37 UTC

[1/3] activemq-artemis git commit: ARTEMIS-208 BrokerInfo issue, also: enlarged the default max size for tests to avoid send blocking.

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7dfde208c -> 54d9a3e9b


ARTEMIS-208 BrokerInfo issue, also:
  enlarged the default max size for tests to avoid send blocking.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34e127cc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34e127cc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34e127cc

Branch: refs/heads/master
Commit: 34e127cc0ce1f53c354a9d08b3e0daa747b14f3e
Parents: 7dfde20
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Aug 24 10:28:25 2015 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 24 22:54:12 2015 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  4 +++-
 .../openwire/OpenWireProtocolManager.java       | 21 ++++++++++++++++++++
 .../core/remoting/impl/netty/NettyAcceptor.java |  4 ++++
 .../artemiswrapper/ArtemisBrokerWrapper.java    |  4 ++--
 4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 30ffb06..3cddb29 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -270,6 +270,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
 
    private void negotiate(WireFormatInfo command) throws IOException {
       this.wireFormat.renegotiateWireFormat(command);
+      //throw back a brokerInfo here
+      protocolManager.sendBrokerInfo(this);
    }
 
    @Override
@@ -1084,7 +1086,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
             }
          }
       }
-      catch (Exception e) {
+      catch (Throwable e) {
          if (e instanceof ActiveMQSecurityException) {
             resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 8c20c46..5489fdf 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -67,6 +68,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConnectionId;
@@ -135,6 +137,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    private final ScheduledExecutorService scheduledPool;
 
+   private BrokerInfo brokerInfo = new BrokerInfo();
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -148,6 +152,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       if (service != null) {
          service.addNotificationListener(this);
       }
+      brokerInfo.setBrokerName(server.getIdentity());
+      brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
+      brokerInfo.setPeerBrokerInfos(null);
+      brokerInfo.setFaultTolerantConfiguration(false);
+      brokerInfo.setBrokerURL(null);
+
    }
 
    public ProtocolManagerFactory<Interceptor> getFactory() {
@@ -162,6 +172,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
+      if (brokerInfo.getBrokerURL() == null) {
+         NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed;
+         brokerInfo.setBrokerURL(nettyAcceptor.getURL());
+      }
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
       OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
       owConn.init();
@@ -693,4 +707,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
       server.destroyQueue(subQueueName);
    }
+
+   public void sendBrokerInfo(OpenWireConnection connection) {
+      BrokerInfo copy = brokerInfo.copy();
+      //cluster support yet to support
+      copy.setPeerBrokerInfos(null);
+      connection.dispatchAsync(copy);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index f0fafaf..a53b886 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -579,6 +579,10 @@ public class NettyAcceptor implements Acceptor {
       }
       return sb.toString();
    }
+
+   public String getURL() {
+      return "tcp://" + this.host + ":" + this.port;
+   }
    // Inner classes -----------------------------------------------------------------------------
 
    private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index bf92a9c..723529f 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -200,7 +200,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
             settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
          }
          if (entry.isProducerFlowControl()) {
-            settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+            settings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
             if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
                settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
             }
@@ -215,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
             settingsMap.put("#", defSettings);
          }
          if (defaultEntry.isProducerFlowControl()) {
-            defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+            defSettings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
             if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
                defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
             }


[2/3] activemq-artemis git commit: ARTEMIS-208 fixing BrokerInfo, using OpenWire connection instead of static property on the protocolManager

Posted by cl...@apache.org.
ARTEMIS-208 fixing BrokerInfo, using OpenWire connection instead of static property on the protocolManager


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8935483c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8935483c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8935483c

Branch: refs/heads/master
Commit: 8935483cdd0081c5fef7cd983e19f91b00e44182
Parents: 34e127c
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 24 22:25:13 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 24 22:54:59 2015 -0400

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnection.java    |  8 +++++++
 .../artemis/spi/core/remoting/Connection.java   |  9 ++++++++
 .../protocol/openwire/OpenWireConnection.java   |  4 ++++
 .../openwire/OpenWireProtocolManager.java       | 23 ++++++++------------
 .../core/remoting/impl/invm/InVMConnection.java |  4 ++++
 .../core/remoting/impl/netty/NettyAcceptor.java |  3 ---
 6 files changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index f6fe267..e9e64a2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -298,6 +298,14 @@ public class NettyConnection implements Connection {
       return address.toString();
    }
 
+   public String getLocalAddress() {
+      SocketAddress address = channel.localAddress();
+      if (address == null) {
+         return null;
+      }
+      return "tcp://" + address.toString();
+   }
+
    public boolean isDirectDeliver() {
       return directDeliver;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 76e5a3d..ac05267 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -91,6 +91,15 @@ public interface Connection {
    String getRemoteAddress();
 
    /**
+    * Returns a string representation of the local address this connection is connected to.
+    * This is useful when the server is configured at 0.0.0.0 (or multiple IPs).
+    * This will give you the actual IP that's being used.
+    *
+    * @return the local address
+    */
+   String getLocalAddress();
+
+   /**
     * Called periodically to flush any data in the batch buffer
     */
    void checkFlushBatchBuffer();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3cddb29..abe7cdf 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -191,6 +191,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
       this.creationTime = System.currentTimeMillis();
    }
 
+   public String getLocalAddress() {
+      return transportConnection.getLocalAddress();
+   }
+
    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 5489fdf..fa75fcf 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -46,7 +46,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -137,7 +136,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    private final ScheduledExecutorService scheduledPool;
 
-   private BrokerInfo brokerInfo = new BrokerInfo();
 
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
@@ -152,11 +150,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       if (service != null) {
          service.addNotificationListener(this);
       }
-      brokerInfo.setBrokerName(server.getIdentity());
-      brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
-      brokerInfo.setPeerBrokerInfos(null);
-      brokerInfo.setFaultTolerantConfiguration(false);
-      brokerInfo.setBrokerURL(null);
 
    }
 
@@ -172,10 +165,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
-      if (brokerInfo.getBrokerURL() == null) {
-         NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed;
-         brokerInfo.setBrokerURL(nettyAcceptor.getURL());
-      }
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
       OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
       owConn.init();
@@ -709,9 +698,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
    }
 
    public void sendBrokerInfo(OpenWireConnection connection) {
-      BrokerInfo copy = brokerInfo.copy();
+      BrokerInfo brokerInfo = new BrokerInfo();
+      brokerInfo.setBrokerName(server.getIdentity());
+      brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
+      brokerInfo.setPeerBrokerInfos(null);
+      brokerInfo.setFaultTolerantConfiguration(false);
+      brokerInfo.setBrokerURL(connection.getLocalAddress());
+
       //cluster support yet to support
-      copy.setPeerBrokerInfos(null);
-      connection.dispatchAsync(copy);
+      brokerInfo.setPeerBrokerInfos(null);
+      connection.dispatchAsync(brokerInfo);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 0205141..59c319a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -208,6 +208,10 @@ public class InVMConnection implements Connection {
       return "invm:" + serverID;
    }
 
+   public String getLocalAddress() {
+      return "invm:" + serverID;
+   }
+
    public int getBatchingBufferSize() {
       return -1;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index a53b886..c9bc16a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -580,9 +580,6 @@ public class NettyAcceptor implements Acceptor {
       return sb.toString();
    }
 
-   public String getURL() {
-      return "tcp://" + this.host + ":" + this.port;
-   }
    // Inner classes -----------------------------------------------------------------------------
 
    private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {


[3/3] activemq-artemis git commit: This closes #135 openwire changes on BrokerInfo

Posted by cl...@apache.org.
This closes #135 openwire changes on BrokerInfo


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54d9a3e9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54d9a3e9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54d9a3e9

Branch: refs/heads/master
Commit: 54d9a3e9bcf429c2357336e8c9766dd27bb17cbd
Parents: 7dfde20 8935483
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 24 23:04:14 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 24 23:04:14 2015 -0400

----------------------------------------------------------------------
 .../core/remoting/impl/netty/NettyConnection.java   |  8 ++++++++
 .../artemis/spi/core/remoting/Connection.java       |  9 +++++++++
 .../core/protocol/openwire/OpenWireConnection.java  |  8 +++++++-
 .../protocol/openwire/OpenWireProtocolManager.java  | 16 ++++++++++++++++
 .../core/remoting/impl/invm/InVMConnection.java     |  4 ++++
 .../core/remoting/impl/netty/NettyAcceptor.java     |  1 +
 .../broker/artemiswrapper/ArtemisBrokerWrapper.java |  4 ++--
 7 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------