You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/25 05:04:37 UTC
[1/3] activemq-artemis git commit: ARTEMIS-208 BrokerInfo issue,
also: enlarged the default max size for tests to avoid send blocking.
Repository: activemq-artemis
Updated Branches:
refs/heads/master 7dfde208c -> 54d9a3e9b
ARTEMIS-208 BrokerInfo issue, also:
enlarged the default max size for tests to avoid send blocking.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34e127cc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34e127cc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34e127cc
Branch: refs/heads/master
Commit: 34e127cc0ce1f53c354a9d08b3e0daa747b14f3e
Parents: 7dfde20
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Aug 24 10:28:25 2015 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 24 22:54:12 2015 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 4 +++-
.../openwire/OpenWireProtocolManager.java | 21 ++++++++++++++++++++
.../core/remoting/impl/netty/NettyAcceptor.java | 4 ++++
.../artemiswrapper/ArtemisBrokerWrapper.java | 4 ++--
4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 30ffb06..3cddb29 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -270,6 +270,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
private void negotiate(WireFormatInfo command) throws IOException {
this.wireFormat.renegotiateWireFormat(command);
+ //throw back a brokerInfo here
+ protocolManager.sendBrokerInfo(this);
}
@Override
@@ -1084,7 +1086,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
}
}
}
- catch (Exception e) {
+ catch (Throwable e) {
if (e instanceof ActiveMQSecurityException) {
resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 8c20c46..5489fdf 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -67,6 +68,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
@@ -135,6 +137,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private final ScheduledExecutorService scheduledPool;
+ private BrokerInfo brokerInfo = new BrokerInfo();
+
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@@ -148,6 +152,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (service != null) {
service.addNotificationListener(this);
}
+ brokerInfo.setBrokerName(server.getIdentity());
+ brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
+ brokerInfo.setPeerBrokerInfos(null);
+ brokerInfo.setFaultTolerantConfiguration(false);
+ brokerInfo.setBrokerURL(null);
+
}
public ProtocolManagerFactory<Interceptor> getFactory() {
@@ -162,6 +172,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
+ if (brokerInfo.getBrokerURL() == null) {
+ NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed;
+ brokerInfo.setBrokerURL(nettyAcceptor.getURL());
+ }
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
owConn.init();
@@ -693,4 +707,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
}
+
+ public void sendBrokerInfo(OpenWireConnection connection) {
+ BrokerInfo copy = brokerInfo.copy();
+ //cluster support yet to support
+ copy.setPeerBrokerInfos(null);
+ connection.dispatchAsync(copy);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index f0fafaf..a53b886 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -579,6 +579,10 @@ public class NettyAcceptor implements Acceptor {
}
return sb.toString();
}
+
+ public String getURL() {
+ return "tcp://" + this.host + ":" + this.port;
+ }
// Inner classes -----------------------------------------------------------------------------
private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index bf92a9c..723529f 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -200,7 +200,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
}
if (entry.isProducerFlowControl()) {
- settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ settings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
}
@@ -215,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
settingsMap.put("#", defSettings);
}
if (defaultEntry.isProducerFlowControl()) {
- defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ defSettings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
if (bservice.getSystemUsage().isSendFailIfNoSpace()) {
defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
}
[2/3] activemq-artemis git commit: ARTEMIS-208 fixing BrokerInfo,
using OpenWire connection instead of static property on the
protocolManager
Posted by cl...@apache.org.
ARTEMIS-208 fixing BrokerInfo, using OpenWire connection instead of static property on the protocolManager
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8935483c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8935483c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8935483c
Branch: refs/heads/master
Commit: 8935483cdd0081c5fef7cd983e19f91b00e44182
Parents: 34e127c
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 24 22:25:13 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 24 22:54:59 2015 -0400
----------------------------------------------------------------------
.../remoting/impl/netty/NettyConnection.java | 8 +++++++
.../artemis/spi/core/remoting/Connection.java | 9 ++++++++
.../protocol/openwire/OpenWireConnection.java | 4 ++++
.../openwire/OpenWireProtocolManager.java | 23 ++++++++------------
.../core/remoting/impl/invm/InVMConnection.java | 4 ++++
.../core/remoting/impl/netty/NettyAcceptor.java | 3 ---
6 files changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index f6fe267..e9e64a2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -298,6 +298,14 @@ public class NettyConnection implements Connection {
return address.toString();
}
+ public String getLocalAddress() {
+ SocketAddress address = channel.localAddress();
+ if (address == null) {
+ return null;
+ }
+ return "tcp://" + address.toString();
+ }
+
public boolean isDirectDeliver() {
return directDeliver;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 76e5a3d..ac05267 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -91,6 +91,15 @@ public interface Connection {
String getRemoteAddress();
/**
+ * Returns a string representation of the local address this connection is connected to.
+ * This is useful when the server is configured at 0.0.0.0 (or multiple IPs).
+ * This will give you the actual IP that's being used.
+ *
+ * @return the local address
+ */
+ String getLocalAddress();
+
+ /**
* Called periodically to flush any data in the batch buffer
*/
void checkFlushBatchBuffer();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3cddb29..abe7cdf 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -191,6 +191,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
this.creationTime = System.currentTimeMillis();
}
+ public String getLocalAddress() {
+ return transportConnection.getLocalAddress();
+ }
+
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 5489fdf..fa75fcf 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -46,7 +46,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -137,7 +136,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private final ScheduledExecutorService scheduledPool;
- private BrokerInfo brokerInfo = new BrokerInfo();
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
@@ -152,11 +150,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (service != null) {
service.addNotificationListener(this);
}
- brokerInfo.setBrokerName(server.getIdentity());
- brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
- brokerInfo.setPeerBrokerInfos(null);
- brokerInfo.setFaultTolerantConfiguration(false);
- brokerInfo.setBrokerURL(null);
}
@@ -172,10 +165,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
- if (brokerInfo.getBrokerURL() == null) {
- NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed;
- brokerInfo.setBrokerURL(nettyAcceptor.getURL());
- }
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
owConn.init();
@@ -709,9 +698,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
}
public void sendBrokerInfo(OpenWireConnection connection) {
- BrokerInfo copy = brokerInfo.copy();
+ BrokerInfo brokerInfo = new BrokerInfo();
+ brokerInfo.setBrokerName(server.getIdentity());
+ brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
+ brokerInfo.setPeerBrokerInfos(null);
+ brokerInfo.setFaultTolerantConfiguration(false);
+ brokerInfo.setBrokerURL(connection.getLocalAddress());
+
//cluster support yet to support
- copy.setPeerBrokerInfos(null);
- connection.dispatchAsync(copy);
+ brokerInfo.setPeerBrokerInfos(null);
+ connection.dispatchAsync(brokerInfo);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 0205141..59c319a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -208,6 +208,10 @@ public class InVMConnection implements Connection {
return "invm:" + serverID;
}
+ public String getLocalAddress() {
+ return "invm:" + serverID;
+ }
+
public int getBatchingBufferSize() {
return -1;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8935483c/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index a53b886..c9bc16a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -580,9 +580,6 @@ public class NettyAcceptor implements Acceptor {
return sb.toString();
}
- public String getURL() {
- return "tcp://" + this.host + ":" + this.port;
- }
// Inner classes -----------------------------------------------------------------------------
private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator {
[3/3] activemq-artemis git commit: This closes #135 openwire changes
on BrokerInfo
Posted by cl...@apache.org.
This closes #135 openwire changes on BrokerInfo
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54d9a3e9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54d9a3e9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54d9a3e9
Branch: refs/heads/master
Commit: 54d9a3e9bcf429c2357336e8c9766dd27bb17cbd
Parents: 7dfde20 8935483
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 24 23:04:14 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 24 23:04:14 2015 -0400
----------------------------------------------------------------------
.../core/remoting/impl/netty/NettyConnection.java | 8 ++++++++
.../artemis/spi/core/remoting/Connection.java | 9 +++++++++
.../core/protocol/openwire/OpenWireConnection.java | 8 +++++++-
.../protocol/openwire/OpenWireProtocolManager.java | 16 ++++++++++++++++
.../core/remoting/impl/invm/InVMConnection.java | 4 ++++
.../core/remoting/impl/netty/NettyAcceptor.java | 1 +
.../broker/artemiswrapper/ArtemisBrokerWrapper.java | 4 ++--
7 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------