You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/02/12 21:01:47 UTC
[7/8] activemq-6 git commit: ACTIVEMQ6-7 - Improve Serialization on
Connection Factory
ACTIVEMQ6-7 - Improve Serialization on Connection Factory
https://issues.apache.org/jira/browse/ACTIVEMQ6-7
Connection Factory is now externalizable and is now serialized as a string that represents a URI. There are schemas for every possible type for connection factory and server locator.
The client JNDI representation of factories has also been changed to be consistent with this.
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/3b76ccc9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/3b76ccc9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/3b76ccc9
Branch: refs/heads/master
Commit: 3b76ccc92b5f0e00de54a4fd6c3705d1b8509771
Parents: b24d729
Author: Andy Taylor <an...@apache.org>
Authored: Thu Jan 29 09:18:36 2015 +0000
Committer: Andy Taylor <an...@apache.org>
Committed: Thu Feb 12 09:14:24 2015 +0000
----------------------------------------------------------------------
.../activemq/utils/uri/SchemaConstants.java | 31 ++
.../apache/activemq/utils/uri/URIFactory.java | 85 +++-
.../apache/activemq/utils/uri/URISchema.java | 86 +++-
.../apache/activemq/utils/URIParserTest.java | 12 +
.../api/core/BroadcastEndpointFactory.java | 2 +-
.../BroadcastEndpointFactoryConfiguration.java | 28 --
.../api/core/BroadcastGroupConfiguration.java | 18 +-
.../core/ChannelBroadcastEndpointFactory.java | 41 ++
.../api/core/DiscoveryGroupConfiguration.java | 67 +--
...ryGroupConfigurationCompatibilityHelper.java | 45 ---
.../api/core/JGroupsBroadcastEndpoint.java | 281 +++++++++++++
.../JGroupsBroadcastGroupConfiguration.java | 404 -------------------
.../core/JGroupsChannelBroadcastEndpoint.java | 39 ++
.../api/core/JGroupsFileBroadcastEndpoint.java | 49 +++
.../JGroupsFileBroadcastEndpointFactory.java | 55 +++
.../JGroupsPropertiesBroadcastEndpoint.java | 43 ++
...roupsPropertiesBroadcastEndpointFactory.java | 55 +++
.../api/core/UDPBroadcastEndpointFactory.java | 330 +++++++++++++++
.../core/UDPBroadcastGroupConfiguration.java | 339 ----------------
.../api/core/client/ActiveMQClient.java | 14 +
.../api/core/client/TopologyMember.java | 4 +-
.../core/client/impl/ServerLocatorImpl.java | 9 +-
.../remoting/ClientProtocolManagerFactory.java | 4 +-
.../uri/AbstractServerLocatorSchema.java | 34 ++
.../apache/activemq/uri/ConnectionOptions.java | 75 ++++
.../activemq/uri/InVMServerLocatorSchema.java | 73 ++++
.../uri/JGroupsServerLocatorSchema.java | 102 +++++
.../activemq/uri/ServerLocatorParser.java | 33 ++
.../activemq/uri/TCPServerLocatorSchema.java | 160 ++++++++
.../activemq/uri/UDPServerLocatorSchema.java | 89 ++++
activemq-jms-client/pom.xml | 7 +
.../activemq/api/jms/ActiveMQJMSClient.java | 13 +
.../jms/client/ActiveMQConnectionFactory.java | 70 +++-
.../client/ActiveMQJMSConnectionFactory.java | 1 -
.../jndi/ActiveMQInitialContextFactory.java | 359 ++--------------
.../apache/activemq/uri/AbstractCFSchema.java | 6 +-
.../activemq/uri/ConnectionFactoryParser.java | 2 +
.../apache/activemq/uri/ConnectionOptions.java | 120 ------
.../org/apache/activemq/uri/InVMSchema.java | 51 +++
.../org/apache/activemq/uri/JGroupsSchema.java | 53 ++-
.../activemq/uri/JMSConnectionOptions.java | 79 ++++
.../java/org/apache/activemq/uri/TCPSchema.java | 67 +++
.../java/org/apache/activemq/uri/UDPSchema.java | 33 +-
.../activemq/uri/ConnectionFactoryURITest.java | 368 ++++++++++++++++-
.../activemq/ra/ActiveMQResourceAdapter.java | 33 +-
.../activemq/core/config/Configuration.java | 3 +-
.../core/config/impl/ConfigurationImpl.java | 3 +-
.../deployers/impl/FileConfigurationParser.java | 26 +-
.../impl/BroadcastGroupControlImpl.java | 14 +-
.../remoting/impl/invm/InVMAcceptorFactory.java | 1 -
.../impl/invm/InVMConnectorFactory.java | 1 -
.../remoting/impl/invm/TransportConstants.java | 5 -
.../impl/netty/NettyAcceptorFactory.java | 1 -
.../server/impl/RemotingServiceImpl.java | 15 -
.../core/server/cluster/ClusterManager.java | 2 +-
.../spi/core/remoting/AcceptorFactory.java | 1 -
.../core/config/impl/FileConfigurationTest.java | 16 +-
docs/user-manual/en/using-jms.md | 105 ++---
.../aerogear/src/main/resources/jndi.properties | 2 +-
.../ApplicationLayerFailoverExample.java | 2 +-
.../activemq/jms/example/BridgeExample.java | 4 +-
.../browser/src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 2 +-
.../ClusteredDurableSubscriptionExample.java | 4 +-
.../jms/example/ClusteredGroupingExample.java | 6 +-
.../jms/example/ClusteredJgroupsExample.java | 4 +-
.../activemq/server0/client-jndi.properties | 2 +-
.../jms/example/ClusteredQueueExample.java | 4 +-
.../jms/example/ClusteredStandaloneExample.java | 6 +-
.../example/StaticClusteredQueueExample.java | 2 +-
.../jms/example/ClusterStaticOnewayExample.java | 2 +-
.../jms/example/ClusteredTopicExample.java | 4 +-
.../ColocatedFailoverScaleDownExample.java | 8 +-
.../jms/example/ColocatedFailoverExample.java | 8 +-
.../src/main/resources/jndi.properties | 3 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../activemq/jms/example/DivertExample.java | 4 +-
.../src/main/resources/jndi.properties | 2 +-
.../expiry/src/main/resources/jndi.properties | 2 +-
.../jms/example/HAPolicyAutoBackupExample.java | 12 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../activemq/jms/example/JMSBridgeExample.java | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../jms/jmx/src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 3 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 3 +-
.../src/main/resources/jndi.properties | 6 +-
.../paging/src/main/resources/jndi.properties | 2 +-
.../jms/perf/src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 3 +-
.../QueueMessageRedistributionExample.java | 4 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../queue/src/main/resources/jndi.properties | 2 +-
.../activemq/jms/example/ReattachExample.java | 2 +-
.../src/main/resources/jndi.properties | 7 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 6 +-
.../activemq/jms/example/ScaleDownExample.java | 12 +-
.../src/main/resources/jndi.properties | 2 +-
.../security/src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 3 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../stomp/src/main/resources/jndi.properties | 2 +-
.../stomp1.1/src/main/resources/jndi.properties | 2 +-
.../stomp1.2/src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 6 +-
.../jms/example/SymmetricClusterExample.java | 7 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../topic/src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 6 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../src/main/resources/jndi.properties | 2 +-
.../xa-send/src/main/resources/jndi.properties | 2 +-
.../activemq/jms/soak/example/SoakReceiver.java | 2 +-
.../activemq/jms/soak/example/SoakSender.java | 2 +-
.../client/ServerLocatorSerializationTest.java | 131 ------
.../integration/client/SessionFactoryTest.java | 48 +--
.../BridgeWithDiscoveryGroupStartTest.java | 8 +-
.../cluster/distribution/ClusterTestBase.java | 14 +-
.../HAClientTopologyWithDiscoveryTest.java | 8 +-
.../discovery/DiscoveryBaseTest.java | 12 +-
.../discovery/DiscoveryStayAliveTest.java | 10 +-
.../integration/discovery/DiscoveryTest.java | 67 ++-
.../jms/ActiveMQConnectionFactoryTest.java | 16 +-
.../integration/jms/SimpleJNDIClientTest.java | 383 ++++--------------
.../ConnectionFactorySerializationTest.java | 325 +++++++++++++--
...tionFactoryWithJGroupsSerializationTest.java | 19 +-
.../jms/server/JMSServerDeployerTest.java | 10 +-
.../management/ActiveMQServerControlTest.java | 3 +-
.../management/BroadcastGroupControlTest.java | 12 +-
.../ClusterConnectionControl2Test.java | 14 +-
.../ClusterConnectionControlTest.java | 8 +-
.../ra/ActiveMQRAClusteredTestBase.java | 5 +-
.../integration/ra/ResourceAdapterTest.java | 8 +-
.../src/test/resources/jndi.properties | 3 +-
.../jtests/jms/framework/PTPTestCase.java | 3 +-
.../jtests/jms/framework/PubSubTestCase.java | 3 +-
.../jtests/jms/framework/UnifiedTestCase.java | 5 +-
.../tests/unit/ra/ResourceAdapterTest.java | 4 +-
167 files changed, 3112 insertions(+), 2352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java
new file mode 100644
index 0000000..6e7afe1
--- /dev/null
+++ b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/SchemaConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.utils.uri;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class SchemaConstants
+{
+ public static final String TCP = "tcp";
+
+ public static final String UDP = "udp";
+
+ public static final String JGROUPS = "jgroups";
+
+ public static final String VM = "vm";
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
index 4bc5fa4..452bedd 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URIFactory.java
@@ -18,13 +18,13 @@
package org.apache.activemq.utils.uri;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author clebertsuconic
*/
-
public class URIFactory<T>
{
@@ -48,11 +48,25 @@ public class URIFactory<T>
schemaFactory.setFactory(this);
}
- public void removeSchema(final String schemaName)
+ public void removeSchema(final SchemaConstants schemaName)
{
schemas.remove(schemaName);
}
+ public T newObject(String uriString) throws Exception
+ {
+ URI uri = normalise(uriString);
+ URISchema<T> schemaFactory = schemas.get(uri.getScheme());
+
+ if (schemaFactory == null)
+ {
+ throw new NullPointerException("Schema " + uri.getScheme() + " not found");
+ }
+
+
+ return schemaFactory.newObject(uri);
+ }
+
public T newObject(URI uri) throws Exception
{
URISchema<T> schemaFactory = schemas.get(uri.getScheme());
@@ -66,5 +80,72 @@ public class URIFactory<T>
return schemaFactory.newObject(uri);
}
+ public void populateObject(URI uri, T bean) throws Exception
+ {
+ URISchema<T> schemaFactory = schemas.get(uri.getScheme());
+ if (schemaFactory == null)
+ {
+ throw new NullPointerException("Schema " + uri.getScheme() + " not found");
+ }
+
+ schemaFactory.populateObject(uri, bean);
+ }
+
+ public URI createSchema(String scheme, T bean) throws Exception
+ {
+ URISchema<T> schemaFactory = schemas.get(scheme);
+
+ if (schemaFactory == null)
+ {
+ throw new NullPointerException("Schema " + scheme + " not found");
+ }
+ return schemaFactory.newURI(bean);
+ }
+
+ /*
+ * this method is used to change a string with multiple URI's in it into a valid URI.
+ * for instance it is possible to have the following String
+ * (tcp://localhost:5445,tcp://localhost:5545,tcp://localhost:5555)?somequery
+ * This is an invalid URI so will be changed so that the first URI is used and the
+ * extra ones added as part of the URI fragment, like so
+ * tcp://localhost:5445?someQuery#tcp://localhost:5545,tcp://localhost:5555.
+ *
+ * It is the job of the URISchema implementation to handle these fragments as needed.
+ * */
+ private URI normalise(String uri) throws URISyntaxException
+ {
+ if (uri.startsWith("("))
+ {
+ String[] split = uri.split("\\)");
+ String[] connectorURIS = split[0].substring(split[0].indexOf('(') + 1).split(",");
+ String factoryQuery = split.length > 1 ? split[1] : "";
+ StringBuilder builder = new StringBuilder(connectorURIS[0]);
+ if (factoryQuery != null && factoryQuery.length() > 0)
+ {
+ if (connectorURIS[0].contains("?"))
+ {
+ builder.append("&").append(factoryQuery.substring(1));
+ }
+ else
+ {
+ builder.append(factoryQuery);
+ }
+ }
+ if (connectorURIS.length > 1)
+ {
+ builder.append("#");
+ for (int i = 1; i < connectorURIS.length; i++)
+ {
+ if (i > 1)
+ {
+ builder.append(",");
+ }
+ builder.append(connectorURIS[i]);
+ }
+ }
+ return new URI(builder.toString());
+ }
+ return new URI(uri);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
index 4226d71..26ec4cd 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/utils/uri/URISchema.java
@@ -14,15 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.utils.uri;
+import java.beans.PropertyDescriptor;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.FluentPropertyBeanIntrospector;
@@ -40,6 +42,16 @@ public abstract class URISchema<T>
return newObject(uri, null);
}
+ public void populateObject(URI uri, T bean) throws Exception
+ {
+ setData(uri, bean, parseQuery(uri.getQuery(), null));
+ }
+
+ public URI newURI(T bean) throws Exception
+ {
+ return internalNewURI(bean);
+ }
+
private URIFactory<T> parentFactory;
@@ -102,6 +114,8 @@ public abstract class URISchema<T>
protected abstract T internalNewObject(URI uri, Map<String, String> query) throws Exception;
+ protected abstract URI internalNewURI(T bean) throws Exception;
+
private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
@@ -185,4 +199,74 @@ public abstract class URISchema<T>
}
return obj;
}
+
+ public static void setData(URI uri, HashMap<String, Object> properties, Set<String> allowableProperties, Map<String, String> query)
+ {
+ if (allowableProperties.contains("host"))
+ {
+ properties.put("host", uri.getHost());
+ }
+ if (allowableProperties.contains("port"))
+ {
+ properties.put("port", uri.getPort());
+ }
+ if (allowableProperties.contains("userInfo"))
+ {
+ properties.put("userInfo", uri.getUserInfo());
+ }
+ for (Map.Entry<String, String> entry : query.entrySet())
+ {
+ if (allowableProperties.contains(entry.getKey()))
+ {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public static String getData(List<String> ignored, Object... beans) throws Exception
+ {
+ StringBuilder sb = new StringBuilder();
+ synchronized (beanUtils)
+ {
+ for (Object bean : beans)
+ {
+ if (bean != null)
+ {
+ PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean);
+ for (PropertyDescriptor descriptor : descriptors)
+ {
+ if (descriptor.getReadMethod() != null && descriptor.getWriteMethod() != null && isWriteable(descriptor, ignored))
+ {
+ String value = beanUtils.getProperty(bean, descriptor.getName());
+ if (value != null)
+ {
+ sb.append("&").append(descriptor.getName()).append("=").append(value);
+ }
+ }
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ private static boolean isWriteable(PropertyDescriptor descriptor, List<String> ignored)
+ {
+ if (ignored != null && ignored.contains(descriptor.getName()))
+ {
+ return false;
+ }
+ Class<?> type = descriptor.getPropertyType();
+ return (type == Double.class) ||
+ (type == double.class) ||
+ (type == Long.class) ||
+ (type == long.class) ||
+ (type == Integer.class) ||
+ (type == int.class) ||
+ (type == Float.class) ||
+ (type == float.class) ||
+ (type == Boolean.class) ||
+ (type == boolean.class) ||
+ (type == String.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java b/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
index be389c6..1654ec4 100644
--- a/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
+++ b/activemq-commons/src/test/java/org/apache/activemq/utils/URIParserTest.java
@@ -109,6 +109,12 @@ public class URIParserTest
{
return setData(uri, new Fruit(getSchemaName()), query);
}
+
+ @Override
+ protected URI internalNewURI(FruitBase bean)
+ {
+ return null;
+ }
}
class FruitBaseSchema extends URISchema<FruitBase>
@@ -125,6 +131,12 @@ public class URIParserTest
{
return setData(uri, new FruitBase(getSchemaName()), query);
}
+
+ @Override
+ protected URI internalNewURI(FruitBase bean)
+ {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
index 2091c84..8fbe217 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactory.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.api.core;
-import java.io.Serializable;
+import java.io.Serializable;
public interface BroadcastEndpointFactory extends Serializable
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java
deleted file mode 100644
index c600c88..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastEndpointFactoryConfiguration.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-import java.io.Serializable;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * 9/25/12
- */
-public interface BroadcastEndpointFactoryConfiguration extends Serializable
-{
- BroadcastEndpointFactory createBroadcastEndpointFactory();
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
index 6714830..a27b543 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BroadcastGroupConfiguration.java
@@ -38,7 +38,7 @@ public final class BroadcastGroupConfiguration implements Serializable
private long broadcastPeriod = ActiveMQDefaultConfiguration.getDefaultBroadcastPeriod();
- private BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration = null;
+ private BroadcastEndpointFactory endpointFactory = null;
private List<String> connectorInfos = null;
@@ -79,14 +79,14 @@ public final class BroadcastGroupConfiguration implements Serializable
return this;
}
- public BroadcastEndpointFactoryConfiguration getEndpointFactoryConfiguration()
+ public BroadcastEndpointFactory getEndpointFactory()
{
- return endpointFactoryConfiguration;
+ return endpointFactory;
}
- public BroadcastGroupConfiguration setEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration)
+ public BroadcastGroupConfiguration setEndpointFactory(BroadcastEndpointFactory endpointFactory)
{
- this.endpointFactoryConfiguration = endpointFactoryConfiguration;
+ this.endpointFactory = endpointFactory;
return this;
}
@@ -97,7 +97,7 @@ public final class BroadcastGroupConfiguration implements Serializable
int result = 1;
result = prime * result + (int)(broadcastPeriod ^ (broadcastPeriod >>> 32));
result = prime * result + ((connectorInfos == null) ? 0 : connectorInfos.hashCode());
- result = prime * result + ((endpointFactoryConfiguration == null) ? 0 : endpointFactoryConfiguration.hashCode());
+ result = prime * result + ((endpointFactory == null) ? 0 : endpointFactory.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@@ -121,12 +121,12 @@ public final class BroadcastGroupConfiguration implements Serializable
}
else if (!connectorInfos.equals(other.connectorInfos))
return false;
- if (endpointFactoryConfiguration == null)
+ if (endpointFactory == null)
{
- if (other.endpointFactoryConfiguration != null)
+ if (other.endpointFactory != null)
return false;
}
- else if (!endpointFactoryConfiguration.equals(other.endpointFactoryConfiguration))
+ else if (!endpointFactory.equals(other.endpointFactory))
return false;
if (name == null)
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java
new file mode 100644
index 0000000..ce77d6f
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/ChannelBroadcastEndpointFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+
+/**
+* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+*/
+public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+ private final JChannel channel;
+
+ private final String channelName;
+
+ public ChannelBroadcastEndpointFactory(JChannel channel, String channelName)
+ {
+ this.channel = channel;
+ this.channelName = channelName;
+ }
+
+ @Override
+ public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+ {
+ return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
index 5789cc6..cbe0d7a 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfiguration.java
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.api.core;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.activemq.api.core.client.ActiveMQClient;
@@ -48,29 +45,9 @@ public final class DiscoveryGroupConfiguration implements Serializable
private long discoveryInitialWaitTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
/*
- * The localBindAddress is needed so we can be backward compatible with 2.2 clients
- * */
- private transient String localBindAddress = null;
-
- /*
- * The localBindPort is needed so we can be backward compatible with 2.2 clients
- * */
- private transient int localBindPort = -1;
-
- /*
- * The groupAddress is needed so we can be backward compatible with 2.2 clients
- * */
- private String groupAddress = null;
-
- /*
- * The groupPort is needed so we can be backward compatible with 2.2 clients
- * */
- private int groupPort = -1;
-
- /*
* This is the actual object used by the class, it has to be transient so we can handle deserialization with a 2.2 client
* */
- private transient BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration;
+ private BroadcastEndpointFactory endpointFactory;
public DiscoveryGroupConfiguration()
{
@@ -121,51 +98,17 @@ public final class DiscoveryGroupConfiguration implements Serializable
return this;
}
- public BroadcastEndpointFactoryConfiguration getBroadcastEndpointFactoryConfiguration()
+ public BroadcastEndpointFactory getBroadcastEndpointFactory()
{
- return endpointFactoryConfiguration;
+ return endpointFactory;
}
- public DiscoveryGroupConfiguration setBroadcastEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration)
+ public DiscoveryGroupConfiguration setBroadcastEndpointFactory(BroadcastEndpointFactory endpointFactory)
{
- this.endpointFactoryConfiguration = endpointFactoryConfiguration;
- if (endpointFactoryConfiguration instanceof DiscoveryGroupConfigurationCompatibilityHelper)
- {
- DiscoveryGroupConfigurationCompatibilityHelper dgcch = (DiscoveryGroupConfigurationCompatibilityHelper) endpointFactoryConfiguration;
- localBindAddress = dgcch.getLocalBindAddress();
- localBindPort = dgcch.getLocalBindPort();
- groupAddress = dgcch.getGroupAddress();
- groupPort = dgcch.getGroupPort();
- }
+ this.endpointFactory = endpointFactory;
return this;
}
- private void writeObject(ObjectOutputStream out) throws IOException
- {
- out.defaultWriteObject();
- if (groupPort < 0)
- {
- out.writeObject(endpointFactoryConfiguration);
- }
- }
-
- private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException
- {
- in.defaultReadObject();
- if (groupPort < 0)
- {
- endpointFactoryConfiguration = (BroadcastEndpointFactoryConfiguration) in.readObject();
- }
- else
- {
- endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
- .setGroupAddress(groupAddress)
- .setGroupPort(groupPort)
- .setLocalBindAddress(localBindAddress)
- .setLocalBindPort(localBindPort);
- }
- }
-
@Override
public boolean equals(Object o)
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java
deleted file mode 100644
index 4fd1bca..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/DiscoveryGroupConfigurationCompatibilityHelper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-/**
- * This interface is needed for making a DiscoveryGroupConfiguration backward
- * compatible with version 2.2 clients. It is used to extract from new
- * {@link org.apache.activemq.api.core.BroadcastEndpointFactoryConfiguration} the four
- * UDP attributes in order to form a version 2.2 DiscoveryGroupConfiguration
- * in time of serialization.
- *
- * @see DiscoveryGroupConfiguration#readObject(java.io.ObjectInputStream)
- * @see DiscoveryGroupConfiguration#writeObject(java.io.ObjectOutputStream)
- *
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * 12/13/12
- */
-public interface DiscoveryGroupConfigurationCompatibilityHelper
-{
-// XXX No javadocs
- String getLocalBindAddress();
-
-// XXX No javadocs
- int getLocalBindPort();
-
-// XXX No javadocs
- String getGroupAddress();
-
-// XXX No javadocs
- int getGroupPort();
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java
new file mode 100644
index 0000000..07381ed
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastEndpoint.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+import org.jgroups.ReceiverAdapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the implementation of ActiveMQ members discovery that will use JGroups.
+ *
+ * @author Howard Gao
+ */
+public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
+{
+ private final String channelName;
+
+ private boolean clientOpened;
+
+ private boolean broadcastOpened;
+
+ private JChannelWrapper channel;
+
+ private JGroupsReceiver receiver;
+
+ public JGroupsBroadcastEndpoint(String channelName)
+ {
+ this.channelName = channelName;
+ }
+
+ public void broadcast(final byte[] data) throws Exception
+ {
+ if (broadcastOpened)
+ {
+ org.jgroups.Message msg = new org.jgroups.Message();
+
+ msg.setBuffer(data);
+
+ channel.send(msg);
+ }
+ }
+
+ public byte[] receiveBroadcast() throws Exception
+ {
+ if (clientOpened)
+ {
+ return receiver.receiveBroadcast();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
+ {
+ if (clientOpened)
+ {
+ return receiver.receiveBroadcast(time, unit);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public synchronized void openClient() throws Exception
+ {
+ if (clientOpened)
+ {
+ return;
+ }
+ internalOpen();
+ receiver = new JGroupsReceiver();
+ channel.setReceiver(receiver);
+ clientOpened = true;
+ }
+
+ public synchronized void openBroadcaster() throws Exception
+ {
+ if (broadcastOpened) return;
+ internalOpen();
+ broadcastOpened = true;
+ }
+
+ public abstract JChannel createChannel() throws Exception;
+
+ public JGroupsBroadcastEndpoint initChannel() throws Exception
+ {
+ this.channel = JChannelManager.getJChannel(channelName, this);
+ return this;
+ }
+
+ protected void internalOpen() throws Exception
+ {
+ channel.connect();
+ }
+
+ public synchronized void close(boolean isBroadcast) throws Exception
+ {
+ if (isBroadcast)
+ {
+ broadcastOpened = false;
+ }
+ else
+ {
+ channel.removeReceiver(receiver);
+ clientOpened = false;
+ }
+ channel.close();
+ }
+
+ /**
+ * This class is used to receive messages from a JGroups channel.
+ * Incoming messages are put into a queue.
+ */
+ private static final class JGroupsReceiver extends ReceiverAdapter
+ {
+ private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>();
+
+ @Override
+ public void receive(org.jgroups.Message msg)
+ {
+ dequeue.add(msg.getBuffer());
+ }
+
+ public byte[] receiveBroadcast() throws Exception
+ {
+ return dequeue.take();
+ }
+
+ public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
+ {
+ return dequeue.poll(time, unit);
+ }
+ }
+
+ /**
+ * This class wraps a JChannel with a reference counter. The reference counter
+ * controls the life of the JChannel. When reference count is zero, the channel
+ * will be disconnected.
+ */
+ protected static class JChannelWrapper
+ {
+ int refCount = 1;
+ JChannel channel;
+ String channelName;
+ List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
+
+ public JChannelWrapper(String channelName, JChannel channel) throws Exception
+ {
+ this.refCount = 1;
+ this.channelName = channelName;
+ this.channel = channel;
+ }
+
+ public synchronized void close()
+ {
+ refCount--;
+ if (refCount == 0)
+ {
+ JChannelManager.closeChannel(this.channelName, channel);
+ }
+ }
+
+ public void removeReceiver(JGroupsReceiver receiver)
+ {
+ synchronized (receivers)
+ {
+ receivers.remove(receiver);
+ }
+ }
+
+ public synchronized void connect() throws Exception
+ {
+ if (channel.isConnected()) return;
+ channel.setReceiver(new ReceiverAdapter()
+ {
+
+ @Override
+ public void receive(org.jgroups.Message msg)
+ {
+ synchronized (receivers)
+ {
+ for (JGroupsReceiver r : receivers)
+ {
+ r.receive(msg);
+ }
+ }
+ }
+ });
+ channel.connect(channelName);
+ }
+
+ public void setReceiver(JGroupsReceiver jGroupsReceiver)
+ {
+ synchronized (receivers)
+ {
+ receivers.add(jGroupsReceiver);
+ }
+ }
+
+ public void send(org.jgroups.Message msg) throws Exception
+ {
+ channel.send(msg);
+ }
+
+ public JChannelWrapper addRef()
+ {
+ this.refCount++;
+ return this;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
+ }
+ }
+
+ /**
+ * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
+ * the purpose of reference counting.
+ * <p/>
+ * Wherever a JChannel is needed it should only get it by calling the getChannel()
+ * method of this class. The real disconnect of channels are also done here only.
+ */
+ protected static class JChannelManager
+ {
+ private static Map<String, JChannelWrapper> channels;
+
+ public static synchronized JChannelWrapper getJChannel(String channelName, JGroupsBroadcastEndpoint endpoint) throws Exception
+ {
+ if (channels == null)
+ {
+ channels = new HashMap<>();
+ }
+ JChannelWrapper wrapper = channels.get(channelName);
+ if (wrapper == null)
+ {
+ wrapper = new JChannelWrapper(channelName, endpoint.createChannel());
+ channels.put(channelName, wrapper);
+ return wrapper;
+ }
+ return wrapper.addRef();
+ }
+
+ public static synchronized void closeChannel(String channelName, JChannel channel)
+ {
+ channel.setReceiver(null);
+ channel.disconnect();
+ channel.close();
+ JChannelWrapper wrapper = channels.remove(channelName);
+ if (wrapper == null)
+ {
+ throw new IllegalStateException("Did not find channel " + channelName);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java
deleted file mode 100644
index 9d2a0ac..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsBroadcastGroupConfiguration.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.ReceiverAdapter;
-import org.jgroups.conf.PlainConfigurator;
-
-/**
- * The configuration for creating broadcasting/discovery groups using JGroups channels
- * There are two ways to constructing a JGroups channel (JChannel):
- * <ol>
- * <li> by passing in a JGroups configuration file<br>
- * The file must exists in the activemq classpath. ActiveMQ creates a JChannel with the
- * configuration file and use it for broadcasting and discovery. In standalone server
- * mode ActiveMQ uses this way for constructing JChannels.</li>
- * <li> by passing in a JChannel instance<br>
- * This is useful when ActiveMQ needs to get a JChannel from a running JGroups service as in the
- * case of AS7 integration.</li>
- * </ol>
- * <p>
- * Note only one JChannel is needed in a VM. To avoid the channel being prematurely disconnected
- * by any party, a wrapper class is used.
- *
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
- * @see JChannelWrapper, JChannelManager
- */
-public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper
-{
- private static final long serialVersionUID = 8952238567248461285L;
-
- private final BroadcastEndpointFactory factory;
-
- public JGroupsBroadcastGroupConfiguration(final String jgroupsFile, final String channelName)
- {
- factory = new BroadcastEndpointFactory()
- {
- private static final long serialVersionUID = 1047956472941098435L;
-
- @Override
- public BroadcastEndpoint createBroadcastEndpoint() throws Exception
- {
- JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint();
- endpoint.initChannel(jgroupsFile, channelName);
- return endpoint;
- }
- };
- }
-
- public JGroupsBroadcastGroupConfiguration(final JChannel channel, final String channelName)
- {
- factory = new BroadcastEndpointFactory()
- {
- private static final long serialVersionUID = 5110372849181145377L;
-
- @Override
- public BroadcastEndpoint createBroadcastEndpoint() throws Exception
- {
- JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint();
- endpoint.initChannel(channel, channelName);
- return endpoint;
- }
- };
- }
-
- @Override
- public BroadcastEndpointFactory createBroadcastEndpointFactory()
- {
- return factory;
- }
-
- @Override
- public String getLocalBindAddress()
- {
- return null;
- }
-
- @Override
- /*
- * return -1 to force deserialization of object
- * */
- public int getLocalBindPort()
- {
- return -1;
- }
-
- @Override
- public String getGroupAddress()
- {
- return null;
- }
-
- @Override
- public int getGroupPort()
- {
- return -1;
- }
-
- /**
- * This class is the implementation of ActiveMQ members discovery that will use JGroups.
- *
- * @author Howard Gao
- */
- private static final class JGroupsBroadcastEndpoint implements BroadcastEndpoint
- {
- private boolean clientOpened;
-
- private boolean broadcastOpened;
-
- private JChannelWrapper<?> channel;
-
- private JGroupsReceiver receiver;
-
- public void broadcast(final byte[] data) throws Exception
- {
- if (broadcastOpened)
- {
- Message msg = new Message();
-
- msg.setBuffer(data);
-
- channel.send(msg);
- }
- }
-
- public byte[] receiveBroadcast() throws Exception
- {
- if (clientOpened)
- {
- return receiver.receiveBroadcast();
- }
- else
- {
- return null;
- }
- }
-
- public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
- {
- if (clientOpened)
- {
- return receiver.receiveBroadcast(time, unit);
- }
- else
- {
- return null;
- }
- }
-
- public synchronized void openClient() throws Exception
- {
- if (clientOpened)
- {
- return;
- }
- internalOpen();
- receiver = new JGroupsReceiver();
- channel.setReceiver(receiver);
- clientOpened = true;
- }
-
- public synchronized void openBroadcaster() throws Exception
- {
- if (broadcastOpened) return;
- internalOpen();
- broadcastOpened = true;
- }
-
- private void initChannel(final String jgroupsConfig, final String channelName) throws Exception
- {
- PlainConfigurator configurator = new PlainConfigurator(jgroupsConfig);
- try
- {
- this.channel = JChannelManager.getJChannel(channelName, configurator);
- return;
- }
- catch (Exception e)
- {
- this.channel = null;
- }
- URL configURL = Thread.currentThread().getContextClassLoader().getResource(jgroupsConfig);
-
- if (configURL == null)
- {
- throw new RuntimeException("couldn't find JGroups configuration " + jgroupsConfig);
- }
- this.channel = JChannelManager.getJChannel(channelName, configURL);
- }
-
- private void initChannel(final JChannel channel1, final String channelName) throws Exception
- {
- this.channel = JChannelManager.getJChannel(channelName, channel1);
- }
-
- protected void internalOpen() throws Exception
- {
- channel.connect();
- }
-
- public synchronized void close(boolean isBroadcast) throws Exception
- {
- if (isBroadcast)
- {
- broadcastOpened = false;
- }
- else
- {
- channel.removeReceiver(receiver);
- clientOpened = false;
- }
- channel.close();
- }
-
- /**
- * This class is used to receive messages from a JGroups channel.
- * Incoming messages are put into a queue.
- */
- private static final class JGroupsReceiver extends ReceiverAdapter
- {
- private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>();
-
- @Override
- public void receive(org.jgroups.Message msg)
- {
- dequeue.add(msg.getBuffer());
- }
-
- public byte[] receiveBroadcast() throws Exception
- {
- return dequeue.take();
- }
-
- public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
- {
- return dequeue.poll(time, unit);
- }
- }
-
- /**
- * This class wraps a JChannel with a reference counter. The reference counter
- * controls the life of the JChannel. When reference count is zero, the channel
- * will be disconnected.
- *
- * @param <T>
- */
- private static class JChannelWrapper<T>
- {
- int refCount = 1;
- JChannel channel;
- String channelName;
- List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
-
- public JChannelWrapper(String channelName, T t) throws Exception
- {
- this.refCount = 1;
- this.channelName = channelName;
- if (t instanceof URL)
- {
- this.channel = new JChannel((URL) t);
- }
- else if (t instanceof JChannel)
- {
- this.channel = (JChannel) t;
- }
- else if (t instanceof PlainConfigurator)
- {
- this.channel = new JChannel((PlainConfigurator)t);
- }
- else
- {
- throw new IllegalArgumentException("Unsupported type " + t);
- }
- }
-
- public synchronized void close()
- {
- refCount--;
- if (refCount == 0)
- {
- JChannelManager.closeChannel(this.channelName, channel);
- }
- }
-
- public void removeReceiver(JGroupsReceiver receiver)
- {
- synchronized (receivers)
- {
- receivers.remove(receiver);
- }
- }
-
- public synchronized void connect() throws Exception
- {
- if (channel.isConnected()) return;
- channel.setReceiver(new ReceiverAdapter()
- {
-
- @Override
- public void receive(Message msg)
- {
- synchronized (receivers)
- {
- for (JGroupsReceiver r : receivers)
- {
- r.receive(msg);
- }
- }
- }
- });
- channel.connect(channelName);
- }
-
- public void setReceiver(JGroupsReceiver jGroupsReceiver)
- {
- synchronized (receivers)
- {
- receivers.add(jGroupsReceiver);
- }
- }
-
- public void send(Message msg) throws Exception
- {
- channel.send(msg);
- }
-
- public JChannelWrapper<T> addRef()
- {
- this.refCount++;
- return this;
- }
-
- @Override
- public String toString()
- {
- return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
- }
- }
-
- /**
- * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
- * the purpose of reference counting.
- * <p/>
- * Wherever a JChannel is needed it should only get it by calling the getChannel()
- * method of this class. The real disconnect of channels are also done here only.
- */
- private static class JChannelManager
- {
- private static Map<String, JChannelWrapper<?>> channels;
-
- public static synchronized <T> JChannelWrapper<?> getJChannel(String channelName, T t) throws Exception
- {
- if (channels == null)
- {
- channels = new HashMap<String, JChannelWrapper<?>>();
- }
- JChannelWrapper<?> wrapper = channels.get(channelName);
- if (wrapper == null)
- {
- wrapper = new JChannelWrapper<T>(channelName, t);
- channels.put(channelName, wrapper);
- return wrapper;
- }
- return wrapper.addRef();
- }
-
- public static synchronized void closeChannel(String channelName, JChannel channel)
- {
- channel.setReceiver(null);
- channel.disconnect();
- channel.close();
- JChannelWrapper<?> wrapper = channels.remove(channelName);
- if (wrapper == null)
- {
- throw new IllegalStateException("Did not find channel " + channelName);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java
new file mode 100644
index 0000000..e14d0f0
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsChannelBroadcastEndpoint.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint
+{
+ private final JChannel jChannel;
+
+ public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception
+ {
+ super(channelName);
+ this.jChannel = jChannel;
+ }
+
+ @Override
+ public JChannel createChannel() throws Exception
+ {
+ return jChannel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java
new file mode 100644
index 0000000..6f0b8ad
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpoint.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+
+import java.net.URL;
+
+/**
+ * This class is the implementation of ActiveMQ members discovery that will use JGroups.
+ *
+ * @author Howard Gao
+ */
+public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
+{
+ private String file;
+
+ public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception
+ {
+ super(channelName);
+ this.file = file;
+ }
+
+ public JChannel createChannel() throws Exception
+ {
+ URL configURL = Thread.currentThread().getContextClassLoader().getResource(file);
+
+ if (configURL == null)
+ {
+ throw new RuntimeException("couldn't find JGroups configuration " + file);
+ }
+
+ return new JChannel(configURL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java
new file mode 100644
index 0000000..21bef7a
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsFileBroadcastEndpointFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+/**
+* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+*/
+public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+ private String file;
+
+ private String channelName;
+
+ @Override
+ public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+ {
+ return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
+ }
+
+ public String getFile()
+ {
+ return file;
+ }
+
+ public JGroupsFileBroadcastEndpointFactory setFile(String file)
+ {
+ this.file = file;
+ return this;
+ }
+
+ public String getChannelName()
+ {
+ return channelName;
+ }
+
+ public JGroupsFileBroadcastEndpointFactory setChannelName(String channelName)
+ {
+ this.channelName = channelName;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java
new file mode 100644
index 0000000..ddc5c19
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import org.jgroups.JChannel;
+import org.jgroups.conf.PlainConfigurator;
+
+/**
+ * This class is the implementation of ActiveMQ members discovery that will use JGroups.
+ *
+ * @author Howard Gao
+ */
+public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint
+{
+ private String properties;
+
+ public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception
+ {
+ super(channelName);
+ this.properties = properties;
+ }
+
+ @Override
+ public JChannel createChannel() throws Exception
+ {
+ PlainConfigurator configurator = new PlainConfigurator(properties);
+ return new JChannel(configurator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
new file mode 100644
index 0000000..b9d06b9
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+/**
+* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+*/
+public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+ private String properties;
+
+ private String channelName;
+
+ @Override
+ public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+ {
+ return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel();
+ }
+
+ public String getProperties()
+ {
+ return properties;
+ }
+
+ public JGroupsPropertiesBroadcastEndpointFactory setProperties(String properties)
+ {
+ this.properties = properties;
+ return this;
+ }
+
+ public String getChannelName()
+ {
+ return channelName;
+ }
+
+ public JGroupsPropertiesBroadcastEndpointFactory setChannelName(String channelName)
+ {
+ this.channelName = channelName;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java
new file mode 100644
index 0000000..b2f6095
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastEndpointFactory.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.api.core;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.core.client.ActiveMQClientLogger;
+
+
+/**
+ * The configuration used to determine how the server will broadcast members.
+ * <p>
+ * This is analogous to {@link org.apache.activemq.api.core.DiscoveryGroupConfiguration}
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> Created 18 Nov 2008 08:44:30
+ */
+public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory
+{
+ private transient String localBindAddress = null;
+
+ private transient int localBindPort = -1;
+
+ private String groupAddress = null;
+
+ private int groupPort = -1;
+
+ public UDPBroadcastEndpointFactory()
+ {
+ }
+
+ public BroadcastEndpoint createBroadcastEndpoint() throws Exception
+ {
+ return new UDPBroadcastEndpoint()
+ .setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null)
+ .setGroupPort(groupPort)
+ .setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null)
+ .setLocalBindPort(localBindPort);
+ }
+
+ public String getGroupAddress()
+ {
+ return groupAddress;
+ }
+
+ public UDPBroadcastEndpointFactory setGroupAddress(String groupAddress)
+ {
+ this.groupAddress = groupAddress;
+ return this;
+ }
+
+ public int getGroupPort()
+ {
+ return groupPort;
+ }
+
+ public UDPBroadcastEndpointFactory setGroupPort(int groupPort)
+ {
+ this.groupPort = groupPort;
+ return this;
+ }
+
+ public int getLocalBindPort()
+ {
+ return localBindPort;
+ }
+
+ public UDPBroadcastEndpointFactory setLocalBindPort(int localBindPort)
+ {
+ this.localBindPort = localBindPort;
+ return this;
+ }
+
+ public String getLocalBindAddress()
+ {
+ return localBindAddress;
+ }
+
+ public UDPBroadcastEndpointFactory setLocalBindAddress(String localBindAddress)
+ {
+ this.localBindAddress = localBindAddress;
+ return this;
+ }
+
+ /**
+ * <p> This is the member discovery implementation using direct UDP. It was extracted as a refactoring from
+ * {@link org.apache.activemq.core.cluster.DiscoveryGroup}</p>
+ *
+ * @author Tomohisa
+ * @author Howard Gao
+ * @author Clebert Suconic
+ */
+ private static class UDPBroadcastEndpoint implements BroadcastEndpoint
+ {
+ private static final int SOCKET_TIMEOUT = 500;
+
+ private InetAddress localAddress;
+
+ private int localBindPort;
+
+ private InetAddress groupAddress;
+
+ private int groupPort;
+
+ private DatagramSocket broadcastingSocket;
+
+ private MulticastSocket receivingSocket;
+
+ private volatile boolean open;
+
+ public UDPBroadcastEndpoint()
+ {
+ }
+
+ public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress)
+ {
+ this.groupAddress = groupAddress;
+ return this;
+ }
+
+ public UDPBroadcastEndpoint setGroupPort(int groupPort)
+ {
+ this.groupPort = groupPort;
+ return this;
+ }
+
+ public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress)
+ {
+ this.localAddress = localAddress;
+ return this;
+ }
+
+ public UDPBroadcastEndpoint setLocalBindPort(int localBindPort)
+ {
+ this.localBindPort = localBindPort;
+ return this;
+ }
+
+
+ public void broadcast(byte[] data) throws Exception
+ {
+ DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+ broadcastingSocket.send(packet);
+ }
+
+ public byte[] receiveBroadcast() throws Exception
+ {
+ final byte[] data = new byte[65535];
+ final DatagramPacket packet = new DatagramPacket(data, data.length);
+
+ while (open)
+ {
+ try
+ {
+ receivingSocket.receive(packet);
+ }
+ // TODO: Do we need this?
+ catch (InterruptedIOException e)
+ {
+ continue;
+ }
+ catch (IOException e)
+ {
+ if (open)
+ {
+ ActiveMQClientLogger.LOGGER.warn(this + " getting exception when receiving broadcasting.", e);
+ }
+ }
+ break;
+ }
+ return data;
+ }
+
+ public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
+ {
+ // We just use the regular method on UDP, there's no timeout support
+ // and this is basically for tests only
+ return receiveBroadcast();
+ }
+
+ public void openBroadcaster() throws Exception
+ {
+ if (localBindPort != -1)
+ {
+ broadcastingSocket = new DatagramSocket(localBindPort, localAddress);
+ }
+ else
+ {
+ if (localAddress != null)
+ {
+ ActiveMQClientLogger.LOGGER.broadcastGroupBindError();
+ }
+ broadcastingSocket = new DatagramSocket();
+ }
+
+ open = true;
+ }
+
+ public void openClient() throws Exception
+ {
+ // HORNETQ-874
+ if (checkForLinux() || checkForSolaris() || checkForHp())
+ {
+ try
+ {
+ receivingSocket = new MulticastSocket(new InetSocketAddress(groupAddress, groupPort));
+ }
+ catch (IOException e)
+ {
+ ActiveMQClientLogger.LOGGER.ioDiscoveryError(groupAddress.getHostAddress(), groupAddress instanceof Inet4Address ? "IPv4" : "IPv6");
+
+ receivingSocket = new MulticastSocket(groupPort);
+ }
+ }
+ else
+ {
+ receivingSocket = new MulticastSocket(groupPort);
+ }
+
+ if (localAddress != null)
+ {
+ receivingSocket.setInterface(localAddress);
+ }
+
+ receivingSocket.joinGroup(groupAddress);
+
+ receivingSocket.setSoTimeout(SOCKET_TIMEOUT);
+
+ open = true;
+ }
+
+ //@Todo: using isBroadcast to share endpoint between broadcast and receiving
+ public void close(boolean isBroadcast) throws Exception
+ {
+ open = false;
+
+ if (broadcastingSocket != null)
+ {
+ broadcastingSocket.close();
+ }
+
+ if (receivingSocket != null)
+ {
+ receivingSocket.close();
+ }
+ }
+
+ private static boolean checkForLinux()
+ {
+ return checkForPresence("os.name", "linux");
+ }
+
+ private static boolean checkForHp()
+ {
+ return checkForPresence("os.name", "hp");
+ }
+
+ private static boolean checkForSolaris()
+ {
+ return checkForPresence("os.name", "sun");
+ }
+
+ private static boolean checkForPresence(String key, String value)
+ {
+ try
+ {
+ String tmp = System.getProperty(key);
+ return tmp != null && tmp.trim().toLowerCase().startsWith(value);
+ }
+ catch (Throwable t)
+ {
+ return false;
+ }
+ }
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((groupAddress == null) ? 0 : groupAddress.hashCode());
+ result = prime * result + groupPort;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ UDPBroadcastEndpointFactory other = (UDPBroadcastEndpointFactory) obj;
+ if (groupAddress == null)
+ {
+ if (other.groupAddress != null)
+ return false;
+ }
+ else if (!groupAddress.equals(other.groupAddress))
+ return false;
+ if (groupPort != other.groupPort)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java
deleted file mode 100644
index 5c84d79..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/UDPBroadcastGroupConfiguration.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.api.core;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.core.client.ActiveMQClientLogger;
-
-
-/**
- * The configuration used to determine how the server will broadcast members.
- * <p>
- * This is analogous to {@link org.apache.activemq.api.core.DiscoveryGroupConfiguration}
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> Created 18 Nov 2008 08:44:30
- */
-public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper
-{
- private static final long serialVersionUID = 1052413739064253955L;
-
- private transient String localBindAddress = null;
-
- private transient int localBindPort = -1;
-
- private String groupAddress = null;
-
- private int groupPort = -1;
-
- public UDPBroadcastGroupConfiguration()
- {
- }
-
- public BroadcastEndpointFactory createBroadcastEndpointFactory()
- {
- return new BroadcastEndpointFactory()
- {
- @Override
- public BroadcastEndpoint createBroadcastEndpoint() throws Exception
- {
- return new UDPBroadcastEndpoint()
- .setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null)
- .setGroupPort(groupPort)
- .setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null)
- .setLocalBindPort(localBindPort);
- }
- };
- }
-
- public String getGroupAddress()
- {
- return groupAddress;
- }
-
- public UDPBroadcastGroupConfiguration setGroupAddress(String groupAddress)
- {
- this.groupAddress = groupAddress;
- return this;
- }
-
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public UDPBroadcastGroupConfiguration setGroupPort(int groupPort)
- {
- this.groupPort = groupPort;
- return this;
- }
-
- public int getLocalBindPort()
- {
- return localBindPort;
- }
-
- public UDPBroadcastGroupConfiguration setLocalBindPort(int localBindPort)
- {
- this.localBindPort = localBindPort;
- return this;
- }
-
- public String getLocalBindAddress()
- {
- return localBindAddress;
- }
-
- public UDPBroadcastGroupConfiguration setLocalBindAddress(String localBindAddress)
- {
- this.localBindAddress = localBindAddress;
- return this;
- }
-
- /**
- * <p> This is the member discovery implementation using direct UDP. It was extracted as a refactoring from
- * {@link org.apache.activemq.core.cluster.DiscoveryGroup}</p>
- *
- * @author Tomohisa
- * @author Howard Gao
- * @author Clebert Suconic
- */
- private static class UDPBroadcastEndpoint implements BroadcastEndpoint
- {
- private static final int SOCKET_TIMEOUT = 500;
-
- private InetAddress localAddress;
-
- private int localBindPort;
-
- private InetAddress groupAddress;
-
- private int groupPort;
-
- private DatagramSocket broadcastingSocket;
-
- private MulticastSocket receivingSocket;
-
- private volatile boolean open;
-
- public UDPBroadcastEndpoint()
- {
- }
-
- public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress)
- {
- this.groupAddress = groupAddress;
- return this;
- }
-
- public UDPBroadcastEndpoint setGroupPort(int groupPort)
- {
- this.groupPort = groupPort;
- return this;
- }
-
- public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress)
- {
- this.localAddress = localAddress;
- return this;
- }
-
- public UDPBroadcastEndpoint setLocalBindPort(int localBindPort)
- {
- this.localBindPort = localBindPort;
- return this;
- }
-
-
- public void broadcast(byte[] data) throws Exception
- {
- DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
- broadcastingSocket.send(packet);
- }
-
- public byte[] receiveBroadcast() throws Exception
- {
- final byte[] data = new byte[65535];
- final DatagramPacket packet = new DatagramPacket(data, data.length);
-
- while (open)
- {
- try
- {
- receivingSocket.receive(packet);
- }
- // TODO: Do we need this?
- catch (InterruptedIOException e)
- {
- continue;
- }
- catch (IOException e)
- {
- if (open)
- {
- ActiveMQClientLogger.LOGGER.warn(this + " getting exception when receiving broadcasting.", e);
- }
- }
- break;
- }
- return data;
- }
-
- public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
- {
- // We just use the regular method on UDP, there's no timeout support
- // and this is basically for tests only
- return receiveBroadcast();
- }
-
- public void openBroadcaster() throws Exception
- {
- if (localBindPort != -1)
- {
- broadcastingSocket = new DatagramSocket(localBindPort, localAddress);
- }
- else
- {
- if (localAddress != null)
- {
- ActiveMQClientLogger.LOGGER.broadcastGroupBindError();
- }
- broadcastingSocket = new DatagramSocket();
- }
-
- open = true;
- }
-
- public void openClient() throws Exception
- {
- // HORNETQ-874
- if (checkForLinux() || checkForSolaris() || checkForHp())
- {
- try
- {
- receivingSocket = new MulticastSocket(new InetSocketAddress(groupAddress, groupPort));
- }
- catch (IOException e)
- {
- ActiveMQClientLogger.LOGGER.ioDiscoveryError(groupAddress.getHostAddress(), groupAddress instanceof Inet4Address ? "IPv4" : "IPv6");
-
- receivingSocket = new MulticastSocket(groupPort);
- }
- }
- else
- {
- receivingSocket = new MulticastSocket(groupPort);
- }
-
- if (localAddress != null)
- {
- receivingSocket.setInterface(localAddress);
- }
-
- receivingSocket.joinGroup(groupAddress);
-
- receivingSocket.setSoTimeout(SOCKET_TIMEOUT);
-
- open = true;
- }
-
- //@Todo: using isBroadcast to share endpoint between broadcast and receiving
- public void close(boolean isBroadcast) throws Exception
- {
- open = false;
-
- if (broadcastingSocket != null)
- {
- broadcastingSocket.close();
- }
-
- if (receivingSocket != null)
- {
- receivingSocket.close();
- }
- }
-
- private static boolean checkForLinux()
- {
- return checkForPresence("os.name", "linux");
- }
-
- private static boolean checkForHp()
- {
- return checkForPresence("os.name", "hp");
- }
-
- private static boolean checkForSolaris()
- {
- return checkForPresence("os.name", "sun");
- }
-
- private static boolean checkForPresence(String key, String value)
- {
- try
- {
- String tmp = System.getProperty(key);
- return tmp != null && tmp.trim().toLowerCase().startsWith(value);
- }
- catch (Throwable t)
- {
- return false;
- }
- }
-
- }
-
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((groupAddress == null) ? 0 : groupAddress.hashCode());
- result = prime * result + groupPort;
- return result;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- UDPBroadcastGroupConfiguration other = (UDPBroadcastGroupConfiguration) obj;
- if (groupAddress == null)
- {
- if (other.groupAddress != null)
- return false;
- }
- else if (!groupAddress.equals(other.groupAddress))
- return false;
- if (groupPort != other.groupPort)
- return false;
- return true;
- }
-}