You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/02/12 21:01:46 UTC
[6/8] activemq-6 git commit: ACTIVEMQ6-7 - Improve Serialization on
Connection Factory
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ActiveMQClient.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ActiveMQClient.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ActiveMQClient.java
index 1dc004e..90c0f6e 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ActiveMQClient.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ActiveMQClient.java
@@ -21,6 +21,9 @@ import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
import org.apache.activemq.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.uri.ServerLocatorParser;
+
+import java.net.URI;
/**
* Utility class for creating ActiveMQ {@link ClientSessionFactory} objects.
@@ -113,6 +116,17 @@ public final class ActiveMQClient
public static final String DEFAULT_CORE_PROTOCOL = "CORE";
/**
+ * Creates a ActiveMQConnectionFactory;
+ *
+ * @return the ActiveMQConnectionFactory
+ */
+ public static ServerLocator createServerLocator(final String url) throws Exception
+ {
+ ServerLocatorParser parser = new ServerLocatorParser();
+ return parser.newObject(new URI(url));
+ }
+
+ /**
* Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
* as the cluster topology changes, and no HA backup information is propagated to the client
*
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/TopologyMember.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/TopologyMember.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/TopologyMember.java
index dd9e95a..72b1374 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/TopologyMember.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/TopologyMember.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.api.core.client;
-import java.io.Serializable;
-
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
@@ -27,7 +25,7 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection;
* Each TopologyMember represents a single server and possibly any backup server that may take over
* its duties (using the nodeId of the original server).
*/
-public interface TopologyMember extends Serializable
+public interface TopologyMember
{
/**
* Returns the {@code backup-group-name} of the live server and backup servers associated with
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java
index 268f9c7..db0b34c 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java
@@ -72,13 +72,8 @@ import org.apache.activemq.utils.UUIDGenerator;
*
* @author Tim Fox
*/
-public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
+public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener
{
- /*needed for backward compatibility*/
- @SuppressWarnings("unused")
- private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
-
- /*end of compatibility fixes*/
private enum STATE
{
INITIALIZED, CLOSED, CLOSING
@@ -398,7 +393,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private static DiscoveryGroup createDiscoveryGroup(String nodeID, DiscoveryGroupConfiguration config) throws Exception
{
DiscoveryGroup group = new DiscoveryGroup(nodeID, config.getName(),
- config.getRefreshTimeout(), config.getBroadcastEndpointFactoryConfiguration().createBroadcastEndpointFactory(), null);
+ config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
return group;
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/ClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/ClientProtocolManagerFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/ClientProtocolManagerFactory.java
index 9eeffd5..e90488b 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/ClientProtocolManagerFactory.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/ClientProtocolManagerFactory.java
@@ -16,12 +16,10 @@
*/
package org.apache.activemq.spi.core.remoting;
-import java.io.Serializable;
-
/**
* @author Clebert Suconic
*/
-public interface ClientProtocolManagerFactory extends Serializable
+public interface ClientProtocolManagerFactory
{
ClientProtocolManager newProtocolManager();
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/AbstractServerLocatorSchema.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/AbstractServerLocatorSchema.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/AbstractServerLocatorSchema.java
new file mode 100644
index 0000000..01297f4
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/AbstractServerLocatorSchema.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public abstract class AbstractServerLocatorSchema extends URISchema<ServerLocator>
+{
+ protected ConnectionOptions newConnectionOptions(URI uri, Map<String, String> query) throws Exception
+ {
+ return setData(uri, new ConnectionOptions(), query);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java
new file mode 100644
index 0000000..9c69ad2
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.uri;
+/**
+ * This will represent all the possible options you could setup on URLs
+ * When parsing the URL this will serve as an intermediate object
+ * And it could also be a pl
+ * @author clebertsuconic
+ */
+
+public class ConnectionOptions
+{
+
+ private boolean ha;
+
+ private String host;
+
+ private int port;
+
+ public ConnectionOptions setHost(String host)
+ {
+ this.host = host;
+ return this;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+
+ public ConnectionOptions setPort(int port)
+ {
+ this.port = port;
+ return this;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public boolean isHa()
+ {
+ return ha;
+ }
+
+ public void setHa(boolean ha)
+ {
+ this.ha = ha;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConnectionOptions{" +
+ "ha=" + ha +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/InVMServerLocatorSchema.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/InVMServerLocatorSchema.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/InVMServerLocatorSchema.java
new file mode 100644
index 0000000..54c3254
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/InVMServerLocatorSchema.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.utils.uri.SchemaConstants;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class InVMServerLocatorSchema extends AbstractServerLocatorSchema
+{
+ @Override
+ public String getSchemaName()
+ {
+ return SchemaConstants.VM;
+ }
+
+ @Override
+ protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
+ {
+ TransportConfiguration tc = createTransportConfiguration(uri);
+ ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc);
+ return URISchema.setData(uri, factory, query);
+ }
+
+ public static TransportConfiguration createTransportConfiguration(URI uri)
+ {
+ Map<String, Object> inVmTransportConfig = new HashMap<>();
+ inVmTransportConfig.put("serverId", uri.getHost());
+ return new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory", inVmTransportConfig);
+ }
+
+ @Override
+ protected URI internalNewURI(ServerLocator bean) throws Exception
+ {
+ return getUri(bean.getStaticTransportConfigurations());
+ }
+
+ public static URI getUri(TransportConfiguration[] configurations) throws URISyntaxException
+ {
+ String host = "0";
+ if (configurations != null && configurations.length > 0)
+ {
+ TransportConfiguration configuration = configurations[0];
+ Map<String, Object> params = configuration.getParams();
+ host = params.get("serverId") == null ? host : params.get("serverId").toString();
+ }
+ return new URI(SchemaConstants.VM, null, host, -1, null, null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/JGroupsServerLocatorSchema.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/JGroupsServerLocatorSchema.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/JGroupsServerLocatorSchema.java
new file mode 100644
index 0000000..b840bcb
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/JGroupsServerLocatorSchema.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.BroadcastEndpointFactory;
+import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
+import org.apache.activemq.api.core.JGroupsPropertiesBroadcastEndpointFactory;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.utils.uri.SchemaConstants;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.io.NotSerializableException;
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema
+{
+ @Override
+ public String getSchemaName()
+ {
+ return SchemaConstants.JGROUPS;
+ }
+
+ @Override
+ protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
+ {
+ ConnectionOptions options = newConnectionOptions(uri, query);
+
+ DiscoveryGroupConfiguration dcConfig = getDiscoveryGroupConfiguration(uri, query);
+
+ if (options.isHa())
+ {
+ return ActiveMQClient.createServerLocatorWithHA(dcConfig);
+ }
+ else
+ {
+ return ActiveMQClient.createServerLocatorWithoutHA(dcConfig);
+ }
+ }
+
+ @Override
+ protected URI internalNewURI(ServerLocator bean) throws Exception
+ {
+ DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
+ BroadcastEndpointFactory endpoint = dgc.getBroadcastEndpointFactory();
+ String auth;
+ if (endpoint instanceof JGroupsFileBroadcastEndpointFactory)
+ {
+ auth = ((JGroupsFileBroadcastEndpointFactory) endpoint).getChannelName();
+ }
+ else if (endpoint instanceof JGroupsPropertiesBroadcastEndpointFactory)
+ {
+ auth = ((JGroupsPropertiesBroadcastEndpointFactory) endpoint).getChannelName();
+ }
+ else
+ {
+ throw new NotSerializableException(endpoint + "not serializable");
+ }
+ String query = URISchema.getData(null, bean, dgc, endpoint);
+ dgc.setBroadcastEndpointFactory(endpoint);
+ return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null);
+ }
+
+ public static DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(URI uri, Map<String, String> query) throws Exception
+ {
+ BroadcastEndpointFactory endpointFactory;
+ if (query.containsKey("file"))
+ {
+ endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(uri.getAuthority());
+ }
+ else
+ {
+ endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName(uri.getAuthority());
+ }
+
+ URISchema.setData(uri, endpointFactory, query);
+
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setBroadcastEndpointFactory(endpointFactory);
+
+ URISchema.setData(uri, dcConfig, query);
+ return dcConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/ServerLocatorParser.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/ServerLocatorParser.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/ServerLocatorParser.java
new file mode 100644
index 0000000..c77250a
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/ServerLocatorParser.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.utils.uri.URIFactory;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ServerLocatorParser extends URIFactory<ServerLocator>
+{
+ public ServerLocatorParser()
+ {
+ registerSchema(new TCPServerLocatorSchema());
+ registerSchema(new UDPServerLocatorSchema());
+ registerSchema(new JGroupsServerLocatorSchema());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/TCPServerLocatorSchema.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/TCPServerLocatorSchema.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/TCPServerLocatorSchema.java
new file mode 100644
index 0000000..ee92c84
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/TCPServerLocatorSchema.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.utils.uri.SchemaConstants;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class TCPServerLocatorSchema extends AbstractServerLocatorSchema
+{
+ @Override
+ public String getSchemaName()
+ {
+ return SchemaConstants.TCP;
+ }
+
+ @Override
+ protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
+ {
+ ConnectionOptions options = newConnectionOptions(uri, query);
+
+ TransportConfiguration[] configurations = getTransportConfigurations(uri, query);
+
+ if (options.isHa())
+ {
+ return ActiveMQClient.createServerLocatorWithHA(configurations);
+ }
+ else
+ {
+ return ActiveMQClient.createServerLocatorWithoutHA(configurations);
+ }
+ }
+
+ public static TransportConfiguration[] getTransportConfigurations(URI uri, Map<String, String> query) throws URISyntaxException
+ {
+ HashMap<String, Object> props = new HashMap<>();
+
+ URISchema.setData(uri, props, TransportConstants.ALLOWABLE_CONNECTOR_KEYS, query);
+ List<TransportConfiguration> transportConfigurations = new ArrayList<>();
+
+ transportConfigurations.add(new TransportConfiguration(NettyConnectorFactory.class.getName(),
+ props,
+ uri.toString()));
+ String connectors = uri.getFragment();
+
+ if (connectors != null)
+ {
+ String[] split = connectors.split(",");
+ for (String s : split)
+ {
+ URI extraUri = new URI(s);
+ HashMap<String, Object> newProps = new HashMap<>();
+ URISchema.setData(extraUri, newProps, TransportConstants.ALLOWABLE_CONNECTOR_KEYS, query);
+ URISchema.setData(extraUri, newProps, TransportConstants.ALLOWABLE_CONNECTOR_KEYS, URISchema.parseQuery(extraUri.getQuery(), null));
+ transportConfigurations.add(new TransportConfiguration(NettyConnectorFactory.class.getName(),
+ newProps,
+ extraUri.toString()));
+ }
+ }
+ TransportConfiguration[] configurations = new TransportConfiguration[transportConfigurations.size()];
+ transportConfigurations.toArray(configurations);
+ return configurations;
+ }
+
+ @Override
+ protected URI internalNewURI(ServerLocator bean) throws Exception
+ {
+ String query = URISchema.getData(null, bean);
+ TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations();
+ return getURI(query, staticConnectors);
+ }
+
+ public static URI getURI(String query, TransportConfiguration[] staticConnectors) throws Exception
+ {
+ if (staticConnectors == null || staticConnectors.length < 1)
+ {
+ throw new Exception();
+ }
+ StringBuilder fragment = new StringBuilder();
+ for (int i = 1; i < staticConnectors.length; i++)
+ {
+ TransportConfiguration connector = staticConnectors[i];
+ Map<String, Object> params = connector.getParams();
+ URI extraUri = new URI(SchemaConstants.TCP, null, getHost(params), getPort(params), null, createQuery(params, null), null);
+ if (i > 1)
+ {
+ fragment.append(",");
+ }
+ fragment.append(extraUri.toASCIIString());
+
+ }
+ Map<String, Object> params = staticConnectors[0].getParams();
+ return new URI(SchemaConstants.TCP, null, getHost(params), getPort(params), null, createQuery(params, query), fragment.toString());
+ }
+
+ private static int getPort(Map<String, Object> params)
+ {
+ Object port = params.get("port");
+ if (port instanceof String)
+ {
+ return Integer.valueOf((String) port);
+ }
+ return port != null ? (int) port : 5445;
+ }
+
+ private static String getHost(Map<String, Object> params)
+ {
+ return params.get("host") != null ? (String) params.get("host") : "localhost";
+ }
+
+ private static String createQuery(Map<String, Object> params, String query)
+ {
+ StringBuilder cb;
+ if (query == null)
+ {
+ cb = new StringBuilder();
+ }
+ else
+ {
+ cb = new StringBuilder(query);
+ }
+ for (String param : params.keySet())
+ {
+ if (cb.length() > 0)
+ {
+ cb.append("&");
+ }
+ cb.append(param).append("=").append(params.get(param));
+ }
+ return cb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-core-client/src/main/java/org/apache/activemq/uri/UDPServerLocatorSchema.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/uri/UDPServerLocatorSchema.java b/activemq-core-client/src/main/java/org/apache/activemq/uri/UDPServerLocatorSchema.java
new file mode 100644
index 0000000..ef1d96b
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/uri/UDPServerLocatorSchema.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.utils.uri.SchemaConstants;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class UDPServerLocatorSchema extends AbstractServerLocatorSchema
+{
+ protected static List<String> IGNORED = new ArrayList<>();
+ static
+ {
+ IGNORED.add("localBindAddress");
+ IGNORED.add("localBindPort");
+ }
+ @Override
+ public String getSchemaName()
+ {
+ return SchemaConstants.UDP;
+ }
+
+ @Override
+ protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
+ {
+ ConnectionOptions options = newConnectionOptions(uri, query);
+
+ DiscoveryGroupConfiguration dgc = getDiscoveryGroupConfiguration(uri, query, getHost(uri), getPort(uri));
+
+ if (options.isHa())
+ {
+ return ActiveMQClient.createServerLocatorWithHA(dgc);
+ }
+ else
+ {
+ return ActiveMQClient.createServerLocatorWithoutHA(dgc);
+ }
+ }
+
+ @Override
+ protected URI internalNewURI(ServerLocator bean) throws Exception
+ {
+ DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
+ UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
+ dgc.setBroadcastEndpointFactory(endpoint);
+ String query = URISchema.getData(IGNORED, bean, dgc, endpoint);
+ return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
+ }
+
+ public static DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(URI uri, Map<String, String> query, String host, int port) throws Exception
+ {
+ UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory()
+ .setGroupAddress(host)
+ .setGroupPort(port);
+
+ URISchema.setData(uri, endpointFactoryConfiguration, query);
+
+ DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query)
+ .setBroadcastEndpointFactory(endpointFactoryConfiguration);
+
+ URISchema.setData(uri, dgc, query);
+ return dgc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-client/pom.xml b/activemq-jms-client/pom.xml
index c40729a..629b8ee 100644
--- a/activemq-jms-client/pom.xml
+++ b/activemq-jms-client/pom.xml
@@ -41,6 +41,13 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core-client</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core-client</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/ActiveMQJMSClient.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/ActiveMQJMSClient.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/ActiveMQJMSClient.java
index 8649459..181de27 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/ActiveMQJMSClient.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/ActiveMQJMSClient.java
@@ -29,6 +29,9 @@ import org.apache.activemq.jms.client.ActiveMQTopicConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXAQueueConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXATopicConnectionFactory;
+import org.apache.activemq.uri.ConnectionFactoryParser;
+
+import java.net.URI;
/**
* A utility class for creating ActiveMQ client-side JMS managed resources.
@@ -37,6 +40,16 @@ import org.apache.activemq.jms.client.ActiveMQXATopicConnectionFactory;
*/
public class ActiveMQJMSClient
{
+ /**
+ * Creates a ActiveMQConnectionFactory;
+ *
+ * @return the ActiveMQConnectionFactory
+ */
+ public static ActiveMQConnectionFactory createConnectionFactory(final String url) throws Exception
+ {
+ ConnectionFactoryParser parser = new ConnectionFactoryParser();
+ return parser.newObject(new URI(url));
+ }
/**
* Creates a ActiveMQConnectionFactory that receives cluster topology updates from the cluster as
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java
index 5ec04d0..5383077 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java
@@ -33,16 +33,24 @@ import javax.jms.XATopicConnection;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
-import java.io.Serializable;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URI;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.jms.referenceable.ConnectionFactoryObjectFactory;
import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr;
+import org.apache.activemq.uri.ConnectionFactoryParser;
+import org.apache.activemq.uri.ServerLocatorParser;
/**
* ActiveMQ implementation of a JMS ConnectionFactory.
@@ -50,11 +58,9 @@ import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr;
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
-public class ActiveMQConnectionFactory implements Serializable, Referenceable, ConnectionFactory, XAConnectionFactory
+public class ActiveMQConnectionFactory implements Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory
{
- private static final long serialVersionUID = -2810634789345348326L;
-
- private final ServerLocator serverLocator;
+ private ServerLocator serverLocator;
private String clientID;
@@ -62,7 +68,58 @@ public class ActiveMQConnectionFactory implements Serializable, Referenceable, C
private int transactionBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
- private boolean readOnly;
+ private boolean readOnly;
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ ConnectionFactoryParser parser = new ConnectionFactoryParser();
+ String scheme;
+ if (serverLocator.getDiscoveryGroupConfiguration() != null)
+ {
+ if (serverLocator.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory)
+ {
+ scheme = "udp";
+ }
+ else
+ {
+ scheme = "jgroups";
+ }
+ }
+ else
+ {
+ scheme = "tcp";
+ }
+ try
+ {
+ URI uri = parser.createSchema(scheme, this);
+ out.writeUTF(uri.toASCIIString());
+ }
+ catch (Exception e)
+ {
+ if (e instanceof IOException)
+ {
+ throw (IOException) e;
+ }
+ throw new IOException(e);
+ }
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ String url = in.readUTF();
+ ConnectionFactoryParser parser = new ConnectionFactoryParser();
+ ServerLocatorParser locatorParser = new ServerLocatorParser();
+ try
+ {
+ URI uri = new URI(url);
+ serverLocator = locatorParser.newObject(uri);
+ parser.populateObject(uri, this);
+ }
+ catch (Exception e)
+ {
+ throw new InvalidObjectException(e.getMessage());
+ }
+ }
public ActiveMQConnectionFactory()
{
@@ -559,7 +616,6 @@ public class ActiveMQConnectionFactory implements Serializable, Referenceable, C
public synchronized int getInitialConnectAttempts()
{
- checkWrite();
return serverLocator.getInitialConnectAttempts();
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java
index d762cbc..0bc852b 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQJMSConnectionFactory.java
@@ -67,5 +67,4 @@ public class ActiveMQJMSConnectionFactory extends ActiveMQConnectionFactory impl
{
super(ha, initialConnectors);
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java
index a600b31..2f9735e 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jndi/ActiveMQInitialContextFactory.java
@@ -21,34 +21,14 @@ import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactory;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
-import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.api.jms.JMSFactoryType;
-import org.apache.activemq.core.client.ActiveMQClientLogger;
-import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.uri.ConnectionFactoryParser;
/**
* A factory of the ActiveMQ InitialContext which contains
@@ -59,52 +39,40 @@ import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
*/
public class ActiveMQInitialContextFactory implements InitialContextFactory
{
- public static final String CONNECTION_FACTORY_NAMES = "connectionFactoryNames";
public static final String REFRESH_TIMEOUT = "refreshTimeout";
public static final String DISCOVERY_INITIAL_WAIT_TIMEOUT = "discoveryInitialWaitTimeout";
-
- private static final String[] DEFAULT_CONNECTION_FACTORY_NAMES = {"ConnectionFactory", "XAConnectionFactory", "QueueConnectionFactory", "TopicConnectionFactory"};
- public static final String TCP_SCHEME = "tcp";
- public static final String JGROUPS_SCHEME = "jgroups";
- public static final String UDP_SCHEME = "udp";
- public static final String VM_SCHEME = "vm";
- public static final String HA = "ha";
- public static final String CF_TYPE = "type";
- public static final String QUEUE_CF = "QUEUE_CF";
- public static final String TOPIC_CF = "TOPIC_CF";
- public static final String QUEUE_XA_CF = "QUEUE_XA_CF";
- public static final String TOPIC_XA_CF = "TOPIC_XA_CF";
- public static final String XA_CF = "XA_CF";
public static final String DYNAMIC_QUEUE_CONTEXT = "dynamicQueues";
public static final String DYNAMIC_TOPIC_CONTEXT = "dynamicTopics";
-
- private String connectionPrefix = "connection.";
+ private String connectionFactoryPrefix = "connectionFactory.";
private String queuePrefix = "queue.";
private String topicPrefix = "topic.";
public Context getInitialContext(Hashtable environment) throws NamingException
{
// lets create a factory
- Map<String, Object> data = new ConcurrentHashMap<String, Object>();
- String[] names = getConnectionFactoryNames(environment);
- for (int i = 0; i < names.length; i++)
+ Map<String, Object> data = new ConcurrentHashMap<>();
+ for (Iterator iter = environment.entrySet().iterator(); iter.hasNext(); )
{
- ActiveMQConnectionFactory factory = null;
- String name = names[i];
-
- try
- {
- factory = createConnectionFactory(name, environment);
- }
- catch (Exception e)
+ Map.Entry entry = (Map.Entry) iter.next();
+ String key = entry.getKey().toString();
+ if (key.startsWith(connectionFactoryPrefix))
{
- e.printStackTrace();
- throw new NamingException("Invalid broker URL");
+ String jndiName = key.substring(connectionFactoryPrefix.length());
+ try
+ {
+ ActiveMQConnectionFactory factory = createConnectionFactory((String) environment.get(key));
+ data.put(jndiName, factory);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new NamingException("Invalid broker URL");
+ }
}
-
- data.put(name, factory);
}
+
+
createQueues(data, environment);
createTopics(data, environment);
@@ -160,58 +128,6 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory
return new ReadOnlyContext(environment, data);
}
- protected ActiveMQConnectionFactory createConnectionFactory(String name, Hashtable environment) throws URISyntaxException, MalformedURLException
- {
- Hashtable connectionFactoryProperties = new Hashtable(environment);
- if (DEFAULT_CONNECTION_FACTORY_NAMES[1].equals(name))
- {
- connectionFactoryProperties.put(CF_TYPE, XA_CF);
- }
- if (DEFAULT_CONNECTION_FACTORY_NAMES[2].equals(name))
- {
- connectionFactoryProperties.put(CF_TYPE, QUEUE_CF);
- }
- if (DEFAULT_CONNECTION_FACTORY_NAMES[3].equals(name))
- {
- connectionFactoryProperties.put(CF_TYPE, TOPIC_CF);
- }
- String prefix = connectionPrefix + name + ".";
- for (Iterator iter = environment.entrySet().iterator(); iter.hasNext(); )
- {
- Map.Entry entry = (Map.Entry) iter.next();
- String key = (String) entry.getKey();
- if (key.startsWith(prefix))
- {
- // Rename the key...
- connectionFactoryProperties.remove(key);
- key = key.substring(prefix.length());
- connectionFactoryProperties.put(key, entry.getValue());
- }
- }
- return createConnectionFactory(connectionFactoryProperties);
- }
-
- protected String[] getConnectionFactoryNames(Map environment)
- {
- String factoryNames = (String) environment.get(CONNECTION_FACTORY_NAMES);
- if (factoryNames != null)
- {
- List<String> list = new ArrayList<String>();
- for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens(); )
- {
- list.add(enumeration.nextToken().trim());
- }
- int size = list.size();
- if (size > 0)
- {
- String[] answer = new String[size];
- list.toArray(answer);
- return answer;
- }
- }
- return DEFAULT_CONNECTION_FACTORY_NAMES;
- }
-
protected void createQueues(Map<String, Object> data, Hashtable environment)
{
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext(); )
@@ -259,238 +175,9 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory
/**
* Factory method to create a new connection factory from the given environment
*/
- protected ActiveMQConnectionFactory createConnectionFactory(Hashtable environment) throws URISyntaxException, MalformedURLException
+ protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception
{
- ActiveMQConnectionFactory connectionFactory;
- Map transportConfig = new HashMap();
-
- if (environment.containsKey(Context.PROVIDER_URL))
- {
- URI providerURI = new URI(((String)environment.get(Context.PROVIDER_URL)));
-
- if (providerURI.getQuery() != null)
- {
- try
- {
- transportConfig = parseQuery(providerURI.getQuery());
- }
- catch (URISyntaxException e)
- {
- }
- }
-
- if (providerURI.getScheme().equals(TCP_SCHEME))
- {
- String[] connectors = providerURI.getAuthority().split(",");
- TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectors.length];
- for (int i = 0; i < connectors.length; i++)
- {
- Map individualTransportConfig = new HashMap(transportConfig);
- String[] hostAndPort = connectors[i].split(":");
- individualTransportConfig.put(TransportConstants.HOST_PROP_NAME, hostAndPort[0]);
- individualTransportConfig.put(TransportConstants.PORT_PROP_NAME, hostAndPort[1]);
- transportConfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), individualTransportConfig);
- }
-
- if (Boolean.TRUE.equals(environment.get(HA)))
- {
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(getJmsFactoryType(environment), transportConfigurations);
- }
- else
- {
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), transportConfigurations);
- }
- }
- else if (providerURI.getScheme().equals(UDP_SCHEME))
- {
- DiscoveryGroupConfiguration dgc = new DiscoveryGroupConfiguration()
- .setRefreshTimeout(transportConfig.containsKey(REFRESH_TIMEOUT) ? Long.parseLong((String) transportConfig.get(REFRESH_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT)
- .setDiscoveryInitialWaitTimeout(transportConfig.containsKey(DISCOVERY_INITIAL_WAIT_TIMEOUT) ? Long.parseLong((String) transportConfig.get(DISCOVERY_INITIAL_WAIT_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT)
- .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration()
- .setGroupAddress(providerURI.getHost())
- .setGroupPort(providerURI.getPort())
- .setLocalBindAddress(transportConfig.containsKey(TransportConstants.LOCAL_ADDRESS_PROP_NAME) ? (String) transportConfig.get(TransportConstants.LOCAL_ADDRESS_PROP_NAME) : null)
- .setLocalBindPort(transportConfig.containsKey(TransportConstants.LOCAL_PORT_PROP_NAME) ? Integer.parseInt((String) transportConfig.get(TransportConstants.LOCAL_PORT_PROP_NAME)) : -1));
- if (Boolean.TRUE.equals(environment.get(HA)))
- {
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, getJmsFactoryType(environment));
- }
- else
- {
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, getJmsFactoryType(environment));
- }
- }
- else if (providerURI.getScheme().equals(JGROUPS_SCHEME))
- {
- JGroupsBroadcastGroupConfiguration config = new JGroupsBroadcastGroupConfiguration(providerURI.getAuthority(), providerURI.getPath() != null ? providerURI.getPath() : UUID.randomUUID().toString());
-
- DiscoveryGroupConfiguration dgc = new DiscoveryGroupConfiguration()
- .setRefreshTimeout(transportConfig.containsKey(REFRESH_TIMEOUT) ? Long.parseLong((String) transportConfig.get(REFRESH_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT)
- .setDiscoveryInitialWaitTimeout(transportConfig.containsKey(DISCOVERY_INITIAL_WAIT_TIMEOUT) ? Long.parseLong((String) transportConfig.get(DISCOVERY_INITIAL_WAIT_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT)
- .setBroadcastEndpointFactoryConfiguration(config);
- if (Boolean.TRUE.equals(environment.get(HA)))
- {
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, getJmsFactoryType(environment));
- }
- else
- {
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, getJmsFactoryType(environment));
- }
- }
- else if (providerURI.getScheme().equals(VM_SCHEME))
- {
- Map inVmTransportConfig = new HashMap();
- inVmTransportConfig.put("serverId", providerURI.getHost());
- TransportConfiguration tc = new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory", inVmTransportConfig);
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), tc);
- }
- else
- {
- throw new IllegalArgumentException("Invalid scheme");
- }
- }
- else
- {
- TransportConfiguration tc = new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory");
- connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), tc);
- }
-
- Properties properties = new Properties();
- properties.putAll(environment);
-
- for (Object key : environment.keySet())
- {
- invokeSetter(connectionFactory, (String) key, environment.get(key));
- }
-
- return connectionFactory;
- }
-
- private JMSFactoryType getJmsFactoryType(Hashtable environment)
- {
- JMSFactoryType ultimateType = JMSFactoryType.CF; // default value
- if (environment.containsKey(CF_TYPE))
- {
- String tempType = (String) environment.get(CF_TYPE);
- if (QUEUE_CF.equals(tempType))
- {
- ultimateType = JMSFactoryType.QUEUE_CF;
- }
- else if (TOPIC_CF.equals(tempType))
- {
- ultimateType = JMSFactoryType.TOPIC_CF;
- }
- else if (QUEUE_XA_CF.equals(tempType))
- {
- ultimateType = JMSFactoryType.QUEUE_XA_CF;
- }
- else if (TOPIC_XA_CF.equals(tempType))
- {
- ultimateType = JMSFactoryType.TOPIC_XA_CF;
- }
- else if (XA_CF.equals(tempType))
- {
- ultimateType = JMSFactoryType.XA_CF;
- }
- }
- return ultimateType;
- }
-
-
- public static Map<String, String> parseQuery(String uri) throws URISyntaxException
- {
- try
- {
- uri = uri.substring(uri.lastIndexOf("?") + 1); // get only the relevant part of the query
- Map<String, String> rc = new HashMap<String, String>();
- if (uri != null && !uri.isEmpty())
- {
- String[] parameters = uri.split("&");
- for (int i = 0; i < parameters.length; i++)
- {
- int p = parameters[i].indexOf("=");
- if (p >= 0)
- {
- String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
- String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
- rc.put(name, value);
- }
- else
- {
- rc.put(parameters[i], null);
- }
- }
- }
- return rc;
- }
- catch (UnsupportedEncodingException e)
- {
- throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
- }
- }
-
- public String getConnectionPrefix()
- {
- return connectionPrefix;
- }
-
- public void setConnectionPrefix(String connectionPrefix)
- {
- this.connectionPrefix = connectionPrefix;
- }
-
- private void invokeSetter(Object target, final String propertyName, final Object propertyValue)
- {
- Method setter = null;
-
- Method[] methods = target.getClass().getMethods();
-
- // turn something like "consumerWindowSize" to "setConsumerWindowSize"
- String setterMethodName = "set" + Character.toUpperCase(propertyName.charAt(0)) + propertyName.substring(1);
-
- for (Method m : methods)
- {
- if (m.getName().equals(setterMethodName))
- {
- setter = m;
- break;
- }
- }
-
- try
- {
- if (setter != null)
- {
- ActiveMQClientLogger.LOGGER.info("Invoking: " + setter + " that takes a " + setter.getParameterTypes()[0] + " with a " + propertyValue.getClass());
- if (propertyValue.getClass() == String.class && setter.getParameterTypes()[0] != String.class)
- {
- String stringPropertyValue = (String) propertyValue;
- if (setter.getParameterTypes()[0] == Integer.TYPE)
- {
- setter.invoke(target, Integer.parseInt(stringPropertyValue));
- }
- else if (setter.getParameterTypes()[0] == Long.TYPE)
- {
- setter.invoke(target, Long.parseLong(stringPropertyValue));
- }
- else if (setter.getParameterTypes()[0] == Double.TYPE)
- {
- setter.invoke(target, Double.parseDouble(stringPropertyValue));
- }
- else if (setter.getParameterTypes()[0] == Boolean.TYPE)
- {
- setter.invoke(target, Boolean.parseBoolean(stringPropertyValue));
- }
- }
- else
- {
- setter.invoke(target, propertyValue);
- }
- }
- }
- catch (Exception e)
- {
- ActiveMQClientLogger.LOGGER.warn("Caught exception during invocation of: " + setter, e);
- }
+ ConnectionFactoryParser parser = new ConnectionFactoryParser();
+ return parser.newObject(uri);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/AbstractCFSchema.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/AbstractCFSchema.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/AbstractCFSchema.java
index 3e4c01e..7608afb 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/uri/AbstractCFSchema.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/AbstractCFSchema.java
@@ -31,15 +31,15 @@ import org.apache.activemq.utils.uri.URISchema;
public abstract class AbstractCFSchema extends URISchema<ActiveMQConnectionFactory>
{
- protected ConnectionOptions newConectionOptions(URI uri, Map<String, String> query) throws Exception
+ protected JMSConnectionOptions newConectionOptions(URI uri, Map<String, String> query) throws Exception
{
String type = query.get("type");
// We do this check here to guarantee proper logging
- if (ConnectionOptions.convertCFType(type) == null)
+ if (JMSConnectionOptions.convertCFType(type) == null)
{
ActiveMQClientLogger.LOGGER.invalidCFType(type, uri.toString());
}
- return setData(uri, new ConnectionOptions(), query);
+ return setData(uri, new JMSConnectionOptions(), query);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionFactoryParser.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionFactoryParser.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionFactoryParser.java
index 88ef45e..ec2e8db 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionFactoryParser.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionFactoryParser.java
@@ -28,7 +28,9 @@ public class ConnectionFactoryParser extends URIFactory<ActiveMQConnectionFactor
{
public ConnectionFactoryParser()
{
+ registerSchema(new TCPSchema());
registerSchema(new UDPSchema());
registerSchema(new JGroupsSchema());
+ registerSchema(new InVMSchema());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java
deleted file mode 100644
index c9a46b3..0000000
--- a/activemq-jms-client/src/main/java/org/apache/activemq/uri/ConnectionOptions.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.uri;
-
-import org.apache.activemq.api.jms.JMSFactoryType;
-
-/**
- * This will represent all the possible options you could setup on URLs
- * When parsing the URL this will serve as an intermediate object
- * And it could also be a pl
- * @author clebertsuconic
- */
-
-public class ConnectionOptions
-{
-
- private boolean ha;
-
- private JMSFactoryType factoryType = JMSFactoryType.CF;
-
- private String host;
-
- private int port;
-
- public ConnectionOptions setHost(String host)
- {
- this.host = host;
- return this;
- }
-
- public String getHost()
- {
- return host;
- }
-
-
- public ConnectionOptions setPort(int port)
- {
- this.port = port;
- return this;
- }
-
- public int getPort()
- {
- return port;
- }
-
- public boolean isHa()
- {
- return ha;
- }
-
- public void setHa(boolean ha)
- {
- this.ha = ha;
- }
-
- public JMSFactoryType getFactoryTypeEnum()
- {
- return factoryType;
- }
-
- public String getType()
- {
- return factoryType.toString();
- }
-
-
- public void setType(final String type)
- {
- this.factoryType = convertCFType(type);
- if (factoryType == null)
- {
- factoryType = JMSFactoryType.CF;
- }
- }
-
- public static JMSFactoryType convertCFType(String type)
- {
- try
- {
- if (type == null)
- {
- return null;
- }
- else
- {
- return Enum.valueOf(JMSFactoryType.class, type);
- }
- }
- catch (Exception e)
- {
- return null;
- }
- }
-
- @Override
- public String toString()
- {
- return "ConnectionOptions{" +
- "ha=" + ha +
- ", factoryType=" + factoryType +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/InVMSchema.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/InVMSchema.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/InVMSchema.java
new file mode 100644
index 0000000..4de3b24
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/InVMSchema.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.utils.uri.SchemaConstants;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class InVMSchema extends AbstractCFSchema
+{
+ @Override
+ public String getSchemaName()
+ {
+ return SchemaConstants.VM;
+ }
+
+ @Override
+ protected ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
+ {
+ JMSConnectionOptions options = newConectionOptions(uri, query);
+ ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), InVMServerLocatorSchema.createTransportConfiguration(uri));
+ return URISchema.setData(uri, factory, query);
+ }
+
+ @Override
+ protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
+ {
+ return InVMServerLocatorSchema.getUri(bean.getStaticConnectors());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/JGroupsSchema.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/JGroupsSchema.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/JGroupsSchema.java
index 302facd..4d92153 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/uri/JGroupsSchema.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/JGroupsSchema.java
@@ -17,14 +17,17 @@
package org.apache.activemq.uri;
+import java.io.NotSerializableException;
import java.net.URI;
import java.util.Map;
-import java.util.UUID;
+import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
+import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
+import org.apache.activemq.api.core.JGroupsPropertiesBroadcastEndpointFactory;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
/**
@@ -36,34 +39,48 @@ public class JGroupsSchema extends AbstractCFSchema
@Override
public String getSchemaName()
{
- return "jgroups";
+ return SchemaConstants.JGROUPS;
}
-
@Override
public ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
- ConnectionOptions options = newConectionOptions(uri, query);
-
- System.out.println("authority = " + uri.getAuthority() + " path = " + uri.getPath());
-
- JGroupsBroadcastGroupConfiguration jgroupsConfig = new JGroupsBroadcastGroupConfiguration(uri.getAuthority(), uri.getPath() != null ? uri.getPath() : UUID.randomUUID().toString());
-
- URISchema.setData(uri, jgroupsConfig, query);
-
-
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setBroadcastEndpointFactoryConfiguration(jgroupsConfig);
-
- URISchema.setData(uri, dcConfig, query);
+ JMSConnectionOptions options = newConectionOptions(uri, query);
+ DiscoveryGroupConfiguration dcConfig = JGroupsServerLocatorSchema.getDiscoveryGroupConfiguration(uri, query);
+ ActiveMQConnectionFactory factory;
if (options.isHa())
{
- return ActiveMQJMSClient.createConnectionFactoryWithHA(dcConfig, options.getFactoryTypeEnum());
+ factory = ActiveMQJMSClient.createConnectionFactoryWithHA(dcConfig, options.getFactoryTypeEnum());
+ }
+ else
+ {
+ factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum());
+ }
+ return URISchema.setData(uri, factory, query);
+ }
+
+ @Override
+ protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
+ {
+ DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
+ BroadcastEndpointFactory endpoint = dgc.getBroadcastEndpointFactory();
+ String auth;
+ if (endpoint instanceof JGroupsFileBroadcastEndpointFactory)
+ {
+ auth = ((JGroupsFileBroadcastEndpointFactory) endpoint).getChannelName();
+ }
+ else if (endpoint instanceof JGroupsPropertiesBroadcastEndpointFactory)
+ {
+ auth = ((JGroupsPropertiesBroadcastEndpointFactory) endpoint).getChannelName();
}
else
{
- return ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum());
+ throw new NotSerializableException(endpoint + "not serializable");
}
+ String query = URISchema.getData(null, bean, dgc, endpoint);
+ dgc.setBroadcastEndpointFactory(endpoint);
+ return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/JMSConnectionOptions.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/JMSConnectionOptions.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/JMSConnectionOptions.java
new file mode 100644
index 0000000..8855fd7
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/JMSConnectionOptions.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.jms.JMSFactoryType;
+
+/**
+ * This will represent all the possible options you could setup on URLs
+ * When parsing the URL this will serve as an intermediate object
+ * And it could also be a pl
+ * @author clebertsuconic
+ */
+
+public class JMSConnectionOptions extends ConnectionOptions
+{
+ private JMSFactoryType factoryType = JMSFactoryType.CF;
+
+ public JMSFactoryType getFactoryTypeEnum()
+ {
+ return factoryType;
+ }
+
+ public String getType()
+ {
+ return factoryType.toString();
+ }
+
+
+ public void setType(final String type)
+ {
+ this.factoryType = convertCFType(type);
+ if (factoryType == null)
+ {
+ factoryType = JMSFactoryType.CF;
+ }
+ }
+
+ public static JMSFactoryType convertCFType(String type)
+ {
+ try
+ {
+ if (type == null)
+ {
+ return JMSFactoryType.CF;
+ }
+ else
+ {
+ return Enum.valueOf(JMSFactoryType.class, type);
+ }
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JMSConnectionOptions{" +
+ ", factoryType=" + factoryType +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/TCPSchema.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/TCPSchema.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/TCPSchema.java
new file mode 100644
index 0000000..a9b7565
--- /dev/null
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/TCPSchema.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.uri;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.utils.uri.SchemaConstants;
+import org.apache.activemq.utils.uri.URISchema;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class TCPSchema extends AbstractCFSchema
+{
+ @Override
+ public String getSchemaName()
+ {
+ return SchemaConstants.TCP;
+ }
+
+ @Override
+ protected ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
+ {
+ JMSConnectionOptions options = newConectionOptions(uri, query);
+
+ TransportConfiguration[] configurations = TCPServerLocatorSchema.getTransportConfigurations(uri, query);
+
+ ActiveMQConnectionFactory factory;
+
+ if (options.isHa())
+ {
+ factory = ActiveMQJMSClient.createConnectionFactoryWithHA(options.getFactoryTypeEnum(), configurations);
+ }
+ else
+ {
+ factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), configurations);
+ }
+
+ return URISchema.setData(uri, factory, query);
+ }
+
+ @Override
+ protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
+ {
+ String query = URISchema.getData(null, bean);
+ TransportConfiguration[] staticConnectors = bean.getStaticConnectors();
+ return TCPServerLocatorSchema.getURI(query, staticConnectors);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3b76ccc9/activemq-jms-client/src/main/java/org/apache/activemq/uri/UDPSchema.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/uri/UDPSchema.java b/activemq-jms-client/src/main/java/org/apache/activemq/uri/UDPSchema.java
index b604233..fdf613b 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/uri/UDPSchema.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/uri/UDPSchema.java
@@ -17,15 +17,14 @@
package org.apache.activemq.uri;
-import java.io.PrintStream;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
-import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
+import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
/**
@@ -37,29 +36,35 @@ public class UDPSchema extends AbstractCFSchema
@Override
public String getSchemaName()
{
- return "udp";
+ return SchemaConstants.UDP;
}
-
@Override
public ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
- ConnectionOptions options = newConectionOptions(uri, query);
+ JMSConnectionOptions options = newConectionOptions(uri, query);
- DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query)
- .setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration()
- .setGroupAddress(getHost(uri))
- .setGroupPort(getPort(uri))
- .setLocalBindAddress(query.containsKey(TransportConstants.LOCAL_ADDRESS_PROP_NAME) ? (String) query.get(TransportConstants.LOCAL_ADDRESS_PROP_NAME) : null)
- .setLocalBindPort(query.containsKey(TransportConstants.LOCAL_PORT_PROP_NAME) ? Integer.parseInt((String) query.get(TransportConstants.LOCAL_PORT_PROP_NAME)) : -1));
+ DiscoveryGroupConfiguration dgc = UDPServerLocatorSchema.getDiscoveryGroupConfiguration(uri, query, getHost(uri), getPort(uri));
+ ActiveMQConnectionFactory factory;
if (options.isHa())
{
- return ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, options.getFactoryTypeEnum());
+ factory = ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, options.getFactoryTypeEnum());
}
else
{
- return ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum());
+ factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum());
}
+ return URISchema.setData(uri, factory, query);
+ }
+
+ @Override
+ protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
+ {
+ DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
+ UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
+ String query = URISchema.getData(UDPServerLocatorSchema.IGNORED, bean, dgc, endpoint);
+ dgc.setBroadcastEndpointFactory(endpoint);
+ return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
}
}