You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by cs...@apache.org on 2017/08/03 14:35:15 UTC
[03/15] karaf git commit: [KARAF-5131] XA + JMS support
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java
----------------------------------------------------------------------
diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java
deleted file mode 100644
index b3ea458..0000000
--- a/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.internal;
-
-import org.apache.karaf.util.json.JsonReader;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueRequestor;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.io.StringReader;
-import java.util.Collections;
-import java.util.List;
-
-class ArtemisDestinationSourceFactory implements DestinationSource.Factory {
-
- @Override
- public DestinationSource create(Connection connection) throws JMSException {
- if (connection.getClass().getName().matches("org\\.apache\\.activemq\\.artemis\\.jms\\.client\\.ActiveMQ(XA)?Connection")) {
- return type -> getNames(connection, type);
- }
- return null;
- }
-
- private List<String> getNames(Connection connection, DestinationSource.DestinationType type) {
- try {
- QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue managementQueue = session.createQueue("activemq.management");
- QueueRequestor requestor = new QueueRequestor(session, managementQueue);
- connection.start();
- TextMessage m = session.createTextMessage();
- m.setStringProperty("_AMQ_ResourceName", "broker");
- m.setStringProperty("_AMQ_OperationName", "getQueueNames");
- String routing = type == DestinationSource.DestinationType.Queue ? "ANYCAST" : "MULTICAST";
- m.setText("[\"" + routing + "\"]");
- Message reply = requestor.request(m);
- String json = ((TextMessage) reply).getText();
- List<?> array = (List<?>) JsonReader.read(new StringReader(json));
- return (List<String>) array.get(0);
- } catch (Exception e) {
- return Collections.emptyList();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java
----------------------------------------------------------------------
diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java
deleted file mode 100644
index efc7bcd..0000000
--- a/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.internal;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import java.util.List;
-
-interface DestinationSource {
-
- enum DestinationType {
- Queue, Topic
- }
-
- interface Factory {
-
- DestinationSource create(Connection connection) throws JMSException;
- }
-
- List<String> getNames(DestinationType type);
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java
----------------------------------------------------------------------
diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java
deleted file mode 100644
index 75c52ce..0000000
--- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.internal;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Comparator;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-
-public class JmsConnector implements Closeable {
- private BundleContext bc;
- private ServiceReference<ConnectionFactory> reference;
- private Connection connection;
- private Session session;
- private String connectionFactoryName;
- private String username;
- private String password;
-
- public JmsConnector(BundleContext bc, String connectionFactoryName, String username, String password) throws JMSException {
- this.bc = bc;
- this.connectionFactoryName = connectionFactoryName;
- this.username = username;
- this.password = password;
- }
-
- private ServiceReference<ConnectionFactory> lookupConnectionFactory(String name) {
- try {
- Collection<ServiceReference<ConnectionFactory>> references = bc.getServiceReferences(
- ConnectionFactory.class,
- "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))");
- return references.stream()
- .sorted(Comparator.<ServiceReference<?>>naturalOrder().reversed())
- .findFirst()
- .orElseThrow(() -> new IllegalArgumentException("No JMS connection factory found for " + name));
- } catch (InvalidSyntaxException e) {
- throw new RuntimeException("Error finding connection factory service " + name, e);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (session != null) {
- try {
- session.close();
- } catch (JMSException e) {
- // Ignore
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException e) {
- // Ignore
- }
- }
- if (reference != null) {
- bc.ungetService(reference);
- }
- }
-
- public Connection connect() throws JMSException {
- reference = this.lookupConnectionFactory(connectionFactoryName);
- ConnectionFactory cf = bc.getService(reference);
- connection = cf.createConnection(username, password);
- connection.start();
- return connection;
- }
-
- public Session createSession() throws JMSException {
- return createSession(Session.AUTO_ACKNOWLEDGE);
- }
-
- public Session createSession(int acknowledgeMode) throws JMSException {
- if (connection == null) {
- connect();
- }
- if (acknowledgeMode == Session.SESSION_TRANSACTED) {
- session = connection.createSession(true, acknowledgeMode);
- } else {
- session = connection.createSession(false, acknowledgeMode);
- }
- return session;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java
----------------------------------------------------------------------
diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java
deleted file mode 100644
index e3b7801..0000000
--- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.internal;
-
-import org.apache.karaf.jms.JmsMBean;
-import org.apache.karaf.jms.JmsMessage;
-import org.apache.karaf.jms.JmsService;
-
-import javax.management.MBeanException;
-import javax.management.openmbean.*;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default implementation of the JMS MBean.
- */
-public class JmsMBeanImpl implements JmsMBean {
-
- private JmsService jmsService;
-
- @Override
- public List<String> getConnectionfactories() throws MBeanException {
- try {
- return jmsService.connectionFactories();
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public void create(String name, String type, String url) throws MBeanException {
- try {
- jmsService.create(name, type, url);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public void create(String name, String type, String url, String username, String password) throws MBeanException {
- try {
- jmsService.create(name, type, url, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public void delete(String name) throws MBeanException {
- try {
- jmsService.delete(name);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public Map<String, String> info(String connectionFactory, String username, String password) throws MBeanException {
- try {
- return jmsService.info(connectionFactory, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public int count(String connectionFactory, String queue, String username, String password) throws MBeanException {
- try {
- return jmsService.count(connectionFactory, queue, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public List<String> queues(String connectionFactory, String username, String password) throws MBeanException {
- try {
- return jmsService.queues(connectionFactory, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public List<String> topics(String connectionFactory, String username, String password) throws MBeanException {
- try {
- return jmsService.topics(connectionFactory, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public void send(String connectionFactory, String queue, String content, String replyTo, String username, String password) throws MBeanException {
- try {
- jmsService.send(connectionFactory, queue, content, replyTo, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public int consume(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException {
- try {
- return jmsService.consume(connectionFactory, queue, selector, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public int move(String connectionFactory, String source, String destination, String selector, String username, String password) throws MBeanException {
- try {
- return jmsService.move(connectionFactory, source, destination, selector, username, password);
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- @Override
- public TabularData browse(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException {
- try {
- CompositeType type = new CompositeType("message", "JMS Message",
- new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" },
- new String[]{ "Message ID", "Content", "Charset", "Type", "Correlation ID", "Delivery Mode", "Destination", "Expiration Date", "Priority", "Redelivered", "Reply-To", "Timestamp" },
- new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.STRING });
- TabularType tableType = new TabularType("messages", "JMS Messages", type, new String[]{ "id" });
- TabularData table = new TabularDataSupport(tableType);
- for (JmsMessage message : getJmsService().browse(connectionFactory, queue, selector, username, password)) {
- CompositeData data = new CompositeDataSupport(type,
- new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" },
- new Object[]{ message.getMessageId(), message.getContent(), message.getCharset(), message.getType(), message.getCorrelationID(), message.getDeliveryMode(), message.getDestination(), message.getExpiration(), message.getPriority(), message.isRedelivered(), message.getReplyTo(), message.getTimestamp() }
- );
- table.put(data);
- }
- return table;
- } catch (Throwable t) {
- throw new MBeanException(null, t.getMessage());
- }
- }
-
- public JmsService getJmsService() {
- return jmsService;
- }
-
- public void setJmsService(JmsService jmsService) {
- this.jmsService = jmsService;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java
----------------------------------------------------------------------
diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java
deleted file mode 100644
index f460018..0000000
--- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.internal;
-
-import org.apache.karaf.jms.JmsMessage;
-import org.apache.karaf.jms.JmsService;
-import org.apache.karaf.util.TemplateUtils;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceReference;
-
-import javax.jms.*;
-
-import java.io.*;
-import java.lang.IllegalStateException;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * Default implementation of the JMS Service.
- */
-public class JmsServiceImpl implements JmsService {
-
- private BundleContext bundleContext;
- private Path deployFolder;
-
- public JmsServiceImpl() {
- deployFolder = Paths.get(System.getProperty("karaf.base"), "deploy");
- }
-
- @Override
- public void create(String name, String type, String url) throws Exception {
- create(name, type, url, null, null);
- }
-
- @Override
- public void create(String name, String type, String url, String username, String password) throws Exception {
- if (!type.equalsIgnoreCase("activemq")
- && !type.equalsIgnoreCase("artemis")
- && !type.equalsIgnoreCase("webspheremq")) {
- throw new IllegalArgumentException("JMS connection factory type not known");
- }
-
- Path outFile = getConnectionFactoryFile(name);
- String template;
- HashMap<String, String> properties = new HashMap<>();
- properties.put("name", name);
-
- if (type.equalsIgnoreCase("activemq")) {
- // activemq
- properties.put("url", url);
- properties.put("username", username);
- properties.put("password", password);
- template = "connectionfactory-activemq.xml";
- } else if (type.equalsIgnoreCase("artemis")) {
- // artemis
- properties.put("url", url);
- properties.put("username", username);
- properties.put("password", password);
- template = "connectionfactory-artemis.xml";
- } else {
- // webspheremq
- String[] splitted = url.split("/");
- if (splitted.length != 4) {
- throw new IllegalStateException("WebsphereMQ URI should be in the following format: host/port/queuemanager/channel");
- }
-
- properties.put("host", splitted[0]);
- properties.put("port", splitted[1]);
- properties.put("queuemanager", splitted[2]);
- properties.put("channel", splitted[3]);
- template = "connectionfactory-webspheremq.xml";
- }
- InputStream is = this.getClass().getResourceAsStream(template);
- if (is == null) {
- throw new IllegalArgumentException("Template resource " + template + " doesn't exist");
- }
- TemplateUtils.createFromTemplate(outFile.toFile(), is, properties);
- }
-
- private Path getConnectionFactoryFile(String name) {
- return deployFolder.resolve("connectionfactory-" + name + ".xml");
- }
-
- @Override
- public void delete(String name) throws Exception {
- Path connectionFactoryFile = getConnectionFactoryFile(name);
- if (!Files.isRegularFile(connectionFactoryFile)) {
- throw new IllegalStateException("The JMS connection factory file " + connectionFactoryFile + " doesn't exist");
- }
- Files.delete(connectionFactoryFile);
- }
-
- @Override
- public List<String> connectionFactories() throws Exception {
- return bundleContext.getServiceReferences(ConnectionFactory.class, null).stream()
- .map(this::getConnectionFactoryName)
- .distinct()
- .collect(Collectors.toList());
- }
-
- private String getConnectionFactoryName(ServiceReference<ConnectionFactory> reference) {
- if (reference.getProperty("osgi.jndi.service.name") != null) {
- return (String) reference.getProperty("osgi.jndi.service.name");
- } else if (reference.getProperty("name") != null) {
- return (String) reference.getProperty("name");
- } else {
- return reference.getProperty(Constants.SERVICE_ID).toString();
- }
- }
-
- @Override
- public List<String> connectionFactoryFileNames() throws Exception {
- return Files.list(deployFolder)
- .map(Path::getFileName)
- .map(Path::toString)
- .filter(name -> name.startsWith("connectionfactory-") && name.endsWith(".xml"))
- .collect(Collectors.toList());
- }
-
- @Override
- public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- ConnectionMetaData metaData = connector.connect().getMetaData();
- Map<String, String> map = new HashMap<>();
- map.put("product", metaData.getJMSProviderName());
- map.put("version", metaData.getProviderVersion());
- return map;
- }
- }
-
- @Override
- public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- Session session = connector.createSession();
- QueueBrowser browser = session.createBrowser(session.createQueue(destination));
- @SuppressWarnings("unchecked")
- Enumeration<Message> enumeration = browser.getEnumeration();
- int count = 0;
- while (enumeration.hasMoreElements()) {
- enumeration.nextElement();
- count++;
- }
- browser.close();
- return count;
- }
- }
-
- private DestinationSource getDestinationSource(Connection connection) throws JMSException {
- while (true) {
- try {
- Method mth = connection.getClass().getMethod("getConnection");
- connection = (Connection) mth.invoke(connection);
- } catch (Throwable e) {
- break;
- }
- }
- List<DestinationSource.Factory> factories = Arrays.asList(
- new ActiveMQDestinationSourceFactory(),
- new ArtemisDestinationSourceFactory()
- );
- DestinationSource source = null;
- for (DestinationSource.Factory factory : factories) {
- source = factory.create(connection);
- if (source != null) {
- break;
- }
- }
- if (source == null) {
- source = d -> Collections.emptyList();
- }
- return source;
- }
-
- @Override
- public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- return getDestinationSource(connector.connect()).getNames(DestinationSource.DestinationType.Queue);
- }
- }
-
- @Override
- public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- return getDestinationSource(connector.connect()).getNames(DestinationSource.DestinationType.Topic);
- }
- }
-
- @Override
- public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter,
- String username, String password) throws JMSException, IOException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- List<JmsMessage> messages = new ArrayList<>();
- Session session = connector.createSession();
- QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter);
- @SuppressWarnings("unchecked")
- Enumeration<Message> enumeration = browser.getEnumeration();
- while (enumeration.hasMoreElements()) {
- Message message = enumeration.nextElement();
-
- messages.add(new JmsMessage(message));
- }
- browser.close();
- return messages;
- }
- }
-
- @Override
- public void send(String connectionFactory, final String queue, final String body, final String replyTo,
- String username, String password) throws IOException, JMSException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- Session session = connector.createSession();
- Message message = session.createTextMessage(body);
- if (replyTo != null) {
- message.setJMSReplyTo(session.createQueue(replyTo));
- }
- MessageProducer producer = session.createProducer(session.createQueue(queue));
- producer.send(message);
- producer.close();
- }
- }
-
- @Override
- public int consume(String connectionFactory, final String queue, final String selector, String username,
- String password) throws Exception {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- int count = 0;
- Session session = connector.createSession();
- MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector);
- Message message;
- do {
- message = consumer.receive(500L);
- if (message != null) {
- count++;
- }
- } while (message != null);
- return count;
- }
- }
-
- @Override
- public int move(String connectionFactory, final String sourceQueue, final String targetQueue,
- final String selector, String username, String password) throws IOException, JMSException {
- try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) {
- int count = 0;
- Session session = connector.createSession(Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector);
- Message message;
- do {
- message = consumer.receive(500L);
- if (message != null) {
- MessageProducer producer = session.createProducer(session.createQueue(targetQueue));
- producer.send(message);
- count++;
- }
- } while (message != null);
- session.commit();
- consumer.close();
- return count;
- }
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java
----------------------------------------------------------------------
diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java
deleted file mode 100644
index 1dd1c09..0000000
--- a/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.karaf.jms.internal.osgi;
-
-import org.apache.karaf.jms.JmsService;
-import org.apache.karaf.jms.internal.JmsMBeanImpl;
-import org.apache.karaf.jms.internal.JmsServiceImpl;
-import org.apache.karaf.shell.api.console.CommandLoggingFilter;
-import org.apache.karaf.shell.support.RegexCommandLoggingFilter;
-import org.apache.karaf.util.tracker.BaseActivator;
-import org.apache.karaf.util.tracker.annotation.ProvideService;
-import org.apache.karaf.util.tracker.annotation.Services;
-
-@Services(
- provides = @ProvideService(JmsService.class)
-)
-public class Activator extends BaseActivator {
- @Override
- protected void doStart() throws Exception {
- JmsServiceImpl service = new JmsServiceImpl();
- service.setBundleContext(bundleContext);
- register(JmsService.class, service);
-
- JmsMBeanImpl mbean = new JmsMBeanImpl();
- mbean.setJmsService(service);
- registerMBean(mbean, "type=jms");
-
- RegexCommandLoggingFilter filter = new RegexCommandLoggingFilter();
- filter.addRegEx("create +.*?--password ([^ ]+)", 2);
- filter.addRegEx("create +.*?-p ([^ ]+)", 2);
- register(CommandLoggingFilter.class, filter);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/OSGI-INF/bundle.info
----------------------------------------------------------------------
diff --git a/jms/core/src/main/resources/OSGI-INF/bundle.info b/jms/core/src/main/resources/OSGI-INF/bundle.info
deleted file mode 100644
index 9d83749..0000000
--- a/jms/core/src/main/resources/OSGI-INF/bundle.info
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-h1. Synopsis
-
- ${project.name}
-
- ${project.description}
-
- Maven URL:
- [mvn:${project.groupId}/${project.artifactId}/${project.version}]
-
-h1. Description
-
- This bundle is the core implementation of the JMS service support.
-
- The JMS service allows you to create connection factories, and send/browse/consume messages.
-
-h1. Commands
-
- The bundle contains the following commands:
-\${command-list|jms|indent=8,list,cyan}
-
-h1. See also
-
- JMS - section of the Karaf User Guide
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml
----------------------------------------------------------------------
diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml
deleted file mode 100644
index da2ad1a..0000000
--- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version
- 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the
- License.
- -->
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
-
- <service interface="javax.jms.ConnectionFactory">
- <service-properties>
- <entry key="name" value="${name}" />
- <entry key="osgi.jndi.service.name" value="jms/${name}" />
- <entry key="karaf.jms.wrap" value="true" />
- <entry key="karaf.jms.pool.maxConnections" value="8" />
- </service-properties>
- <bean class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="${url}" />
- <property name="userName" value="${username}" />
- <property name="password" value="${password}" />
- </bean>
- </service>
-
-</blueprint>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml
----------------------------------------------------------------------
diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml
deleted file mode 100644
index 67b1f54..0000000
--- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version
- 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the
- License.
- -->
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
-
- <service interface="javax.jms.ConnectionFactory">
- <service-properties>
- <entry key="name" value="${name}" />
- <entry key="osgi.jndi.service.name" value="jms/${name}" />
- <entry key="karaf.jms.wrap" value="true" />
- <entry key="karaf.jms.pool.maxConnections" value="8" />
- </service-properties>
- <bean class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory">
- <argument value="${url}" />
- <argument value="${username}" />
- <argument value="${password}" />
- <property name="producerWindowSize" value="-1" />
- </bean>
- </service>
-
-</blueprint>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml
----------------------------------------------------------------------
diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml
deleted file mode 100644
index 999c85b..0000000
--- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version
- 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the
- License.
- -->
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
-
- <service interface="javax.jms.ConnectionFactory">
- <service-properties>
- <entry key="name" value="${name}"/>
- <entry key="osgi.jndi.service.name" value="jms/${name}"/>
- <entry key="karaf.jms.wrap" value="true" />
- </service-properties>
- <bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
- <property name="transportType" value="1" />
- <property name="hostName" value="${hostname}" />
- <property name="port" value="${port}" />
- <property name="queueManager" value="${queuemanager}" />
- <property name="channel" value="${channel}" />
- </bean>
- </service>
-
-</blueprint>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pom.xml
----------------------------------------------------------------------
diff --git a/jms/pom.xml b/jms/pom.xml
index 3b3b185..dd7155c 100644
--- a/jms/pom.xml
+++ b/jms/pom.xml
@@ -25,16 +25,109 @@
<groupId>org.apache.karaf</groupId>
<artifactId>karaf</artifactId>
<version>4.2.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.karaf.jms</groupId>
- <artifactId>parent</artifactId>
- <packaging>pom</packaging>
- <name>Apache Karaf :: Features</name>
-
- <modules>
- <module>core</module>
- <module>pool</module>
- </modules>
+ <artifactId>org.apache.karaf.jms.core</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Karaf :: JMS :: Core</name>
+ <description>This bundle provides core implementation of the JMS service.</description>
+
+ <properties>
+ <appendedResourcesDirectory>${basedir}/../etc/appended-resources</appendedResourcesDirectory>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>javax.jms-api</artifactId>
+ <version>2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.jms</groupId>
+ <artifactId>pax-jms-api</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-pool</artifactId>
+ <version>5.9.0</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.karaf</groupId>
+ <artifactId>org.apache.karaf.util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.karaf.shell</groupId>
+ <artifactId>org.apache.karaf.shell.core</artifactId>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>${project.basedir}/src/main/resources</directory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>${project.basedir}/src/main/resources</directory>
+ <filtering>true</filtering>
+ <includes>
+ <include>**/*.info</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.karaf.tooling</groupId>
+ <artifactId>karaf-services-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package>
+ org.apache.karaf.jms;-noimport:=true
+ </Export-Package>
+ <Import-Package>
+ javax.jms;version="[1.1,3)",
+ org.apache.activemq*;resolution:=optional,
+ *
+ </Import-Package>
+ <Private-Package>
+ org.apache.karaf.jms.command,
+ org.apache.karaf.jms.command.completers,
+ org.apache.karaf.jms.internal,
+ org.apache.karaf.jms.internal.osgi,
+ org.apache.karaf.util,
+ org.apache.karaf.util.json
+ </Private-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/pom.xml
----------------------------------------------------------------------
diff --git a/jms/pool/pom.xml b/jms/pool/pom.xml
deleted file mode 100644
index 21278d2..0000000
--- a/jms/pool/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.karaf</groupId>
- <artifactId>karaf</artifactId>
- <version>4.2.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.karaf.jms</groupId>
- <artifactId>org.apache.karaf.jms.pool</artifactId>
- <packaging>bundle</packaging>
- <name>Apache Karaf :: JMS :: Pool</name>
- <description>This bundle provides pooling implementation of the JMS service.</description>
-
- <properties>
- <appendedResourcesDirectory>${basedir}/../../etc/appended-resources</appendedResourcesDirectory>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- <version>2.4.2</version>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>${project.basedir}/src/main/resources</directory>
- <includes>
- <include>**/*</include>
- </includes>
- </resource>
- <resource>
- <directory>${project.basedir}/src/main/resources</directory>
- <filtering>true</filtering>
- <includes>
- <include>**/*.info</include>
- </includes>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <configuration>
- <instructions>
- <Export-Package>
- </Export-Package>
- <Import-Package>
- javax.jms;version="[1.1,3)",
- *
- </Import-Package>
- <Private-Package>
- org.apache.karaf.jms.pool.internal,
- org.apache.karaf.jms.pool.internal.osgi
- </Private-Package>
- <Bundle-Activator>
- org.apache.karaf.jms.pool.internal.osgi.Activator
- </Bundle-Activator>
- </instructions>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java
deleted file mode 100644
index 9dab2fc..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-/**
- * A cache key for the connection details
- *
- *
- */
-public class ConnectionKey {
- private String userName;
- private String password;
- private int hash;
-
- public ConnectionKey(String userName, String password) {
- this.password = password;
- this.userName = userName;
- hash = 31;
- if (userName != null) {
- hash += userName.hashCode();
- }
- hash *= 31;
- if (password != null) {
- hash += password.hashCode();
- }
- }
-
- public int hashCode() {
- return hash;
- }
-
- public boolean equals(Object that) {
- if (this == that) {
- return true;
- }
- if (that instanceof ConnectionKey) {
- return equals((ConnectionKey)that);
- }
- return false;
- }
-
- public boolean equals(ConnectionKey that) {
- return isEqual(this.userName, that.userName) && isEqual(this.password, that.password);
- }
-
- public String getPassword() {
- return password;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public static boolean isEqual(Object o1, Object o2) {
- if (o1 == o2) {
- return true;
- }
- return o1 != null && o2 != null && o1.equals(o2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java
deleted file mode 100644
index fbf0384..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import org.apache.commons.pool2.KeyedPooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-
-import javax.jms.Connection;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Holds a real JMS connection along with the session pools associated with it.
- * <p/>
- * Instances of this class are shared amongst one or more PooledConnection object and must
- * track the session objects that are loaned out for cleanup on close as well as ensuring
- * that the temporary destinations of the managed Connection are purged when all references
- * to this ConnectionPool are released.
- */
-public class ConnectionPool {
- protected Connection connection;
- private int referenceCount;
- private long lastUsed = System.currentTimeMillis();
- private final long firstUsed = lastUsed;
- private boolean hasExpired;
- private int idleTimeout = 30 * 1000;
- private long expiryTimeout = 0l;
- private boolean useAnonymousProducers = true;
-
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
- private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<>();
-
- public ConnectionPool(Connection connection) {
-
- this.connection = connection;
-
- // Create our internal Pool of session instances.
- this.sessionPool = new GenericKeyedObjectPool<>(
- new KeyedPooledObjectFactory<SessionKey, PooledSession>() {
-
- @Override
- public void activateObject(SessionKey key, PooledObject<PooledSession> session) throws Exception {
- ConnectionPool.this.loanedSessions.add(session.getObject());
- }
-
- @Override
- public void destroyObject(SessionKey key, PooledObject<PooledSession> session) throws Exception {
- ConnectionPool.this.loanedSessions.remove(session.getObject());
- session.getObject().getInternalSession().close();
- }
-
- @Override
- public PooledObject<PooledSession> makeObject(SessionKey key) throws Exception {
- Session session = makeSession(key);
- return new DefaultPooledObject<>(new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers));
- }
-
- @Override
- public void passivateObject(SessionKey key, PooledObject<PooledSession> session) throws Exception {
- ConnectionPool.this.loanedSessions.remove(session.getObject());
- }
-
- @Override
- public boolean validateObject(SessionKey key, PooledObject<PooledSession> session) {
- return true;
- }
- }
- );
- }
-
- // useful when external failure needs to force expiry
- public void setHasExpired(boolean val) {
- hasExpired = val;
- }
-
- protected Session makeSession(SessionKey key) throws JMSException {
- return connection.createSession(key.isTransacted(), key.getAckMode());
- }
-
- public void start() throws JMSException {
- if (started.compareAndSet(false, true)) {
- try {
- connection.start();
- } catch (JMSException e) {
- started.set(false);
- throw(e);
- }
- }
- }
-
- public synchronized Connection getConnection() {
- return connection;
- }
-
- public Session createSession(boolean transacted, int ackMode) throws JMSException {
- SessionKey key = new SessionKey(transacted, ackMode);
- PooledSession session;
- try {
- session = sessionPool.borrowObject(key);
- } catch (Exception e) {
- IllegalStateException illegalStateException = new IllegalStateException(e.toString());
- illegalStateException.initCause(e);
- throw illegalStateException;
- }
- return session;
- }
-
- public synchronized void close() {
- if (connection != null) {
- try {
- sessionPool.close();
- } catch (Exception e) {
- } finally {
- try {
- connection.close();
- } catch (Exception e) {
- } finally {
- connection = null;
- }
- }
- }
- }
-
- public synchronized void incrementReferenceCount() {
- referenceCount++;
- lastUsed = System.currentTimeMillis();
- }
-
- public synchronized void decrementReferenceCount() {
- referenceCount--;
- lastUsed = System.currentTimeMillis();
- if (referenceCount == 0) {
- // Loaned sessions are those that are active in the sessionPool and
- // have not been closed by the client before closing the connection.
- // These need to be closed so that all session's reflect the fact
- // that the parent Connection is closed.
- for (PooledSession session : this.loanedSessions) {
- try {
- session.close();
- } catch (Exception e) {
- }
- }
- this.loanedSessions.clear();
-
- expiredCheck();
- }
- }
-
- /**
- * Determines if this Connection has expired.
- * <p/>
- * A ConnectionPool is considered expired when all references to it are released AND either
- * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
- * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
- *
- * @return true if this connection has expired.
- */
- public synchronized boolean expiredCheck() {
-
- boolean expired = false;
-
- if (connection == null) {
- return true;
- }
-
- if (hasExpired) {
- if (referenceCount == 0) {
- close();
- expired = true;
- }
- }
-
- if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
- hasExpired = true;
- if (referenceCount == 0) {
- close();
- expired = true;
- }
- }
-
- // Only set hasExpired here is no references, as a Connection with references is by
- // definition not idle at this time.
- if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
- hasExpired = true;
- close();
- expired = true;
- }
-
- return expired;
- }
-
- public int getIdleTimeout() {
- return idleTimeout;
- }
-
- public void setIdleTimeout(int idleTimeout) {
- this.idleTimeout = idleTimeout;
- }
-
- public void setExpiryTimeout(long expiryTimeout) {
- this.expiryTimeout = expiryTimeout;
- }
-
- public long getExpiryTimeout() {
- return expiryTimeout;
- }
-
- public int getMaximumActiveSessionPerConnection() {
- return this.sessionPool.getMaxTotalPerKey();
- }
-
- public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
- this.sessionPool.setMaxTotalPerKey(maximumActiveSessionPerConnection);
- }
-
- public boolean isUseAnonymousProducers() {
- return this.useAnonymousProducers;
- }
-
- public void setUseAnonymousProducers(boolean value) {
- this.useAnonymousProducers = value;
- }
-
- /**
- * @return the total number of Pooled session including idle sessions that are not
- * currently loaned out to any client.
- */
- public int getNumSessions() {
- return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
- }
-
- /**
- * @return the total number of Sessions that are in the Session pool but not loaned out.
- */
- public int getNumIdleSessions() {
- return this.sessionPool.getNumIdle();
- }
-
- /**
- * @return the total number of Session's that have been loaned to PooledConnection instances.
- */
- public int getNumActiveSessions() {
- return this.sessionPool.getNumActive();
- }
-
- /**
- * Configure whether the createSession method should block when there are no more idle sessions and the
- * pool already contains the maximum number of active sessions. If false the create method will fail
- * and throw an exception.
- *
- * @param block
- * Indicates whether blocking should be used to wait for more space to create a session.
- */
- public void setBlockIfSessionPoolIsFull(boolean block) {
- this.sessionPool.setBlockWhenExhausted(block);
- }
-
- public boolean isBlockIfSessionPoolIsFull() {
- return this.sessionPool.getBlockWhenExhausted();
- }
-
- /**
- * Returns the timeout to use for blocking creating new sessions
- *
- * @return true if the pooled Connection createSession method will block when the limit is hit.
- * @see #setBlockIfSessionPoolIsFull(boolean)
- */
- public long getBlockIfSessionPoolIsFullTimeout() {
- return this.sessionPool.getMaxWaitMillis();
- }
-
- /**
- * Controls the behavior of the internal session pool. By default the call to
- * Connection.getSession() will block if the session pool is full. This setting
- * will affect how long it blocks and throws an exception after the timeout.
- *
- * The size of the session pool is controlled by the @see #maximumActive
- * property.
- *
- * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
- * property
- *
- * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
- * then use this setting to configure how long to block before retry
- */
- public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
- this.sessionPool.setMaxWaitMillis(blockIfSessionPoolIsFullTimeout);
- }
-
- @Override
- public String toString() {
- return "ConnectionPool[" + connection + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java
deleted file mode 100755
index c5900d1..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLServerSocket;
-import java.lang.reflect.Method;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public final class IntrospectionSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(IntrospectionSupport.class);
-
- private IntrospectionSupport() {
- }
-
- @SuppressWarnings("rawtypes")
- public static boolean setProperties(Object target, Map props) {
- boolean rc = false;
-
- if (target == null) {
- throw new IllegalArgumentException("target was null.");
- }
- if (props == null) {
- throw new IllegalArgumentException("props was null.");
- }
-
- for (Iterator<?> iter = props.entrySet().iterator(); iter.hasNext();) {
- Entry<?,?> entry = (Entry<?,?>)iter.next();
- if (setProperty(target, (String)entry.getKey(), entry.getValue())) {
- iter.remove();
- rc = true;
- }
- }
-
- return rc;
- }
-
- public static boolean setProperty(Object target, String name, Object value) {
- try {
- Class<?> clazz = target.getClass();
- if (target instanceof SSLServerSocket) {
- // overcome illegal access issues with internal implementation class
- clazz = SSLServerSocket.class;
- }
- Method setter = findSetterMethod(clazz, name);
- if (setter == null) {
- return false;
- }
-
- // If the type is null or it matches the needed type, just use the
- // value directly
- if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
- setter.invoke(target, value);
- } else {
- // We need to convert it
- setter.invoke(target, convert(value, setter.getParameterTypes()[0]));
- }
- return true;
- } catch (Exception e) {
- LOG.error(String.format("Could not set property %s on %s", name, target), e);
- return false;
- }
- }
-
- @SuppressWarnings({
- "rawtypes", "unchecked"
- })
- private static Object convert(Object value, Class to) {
- if (value == null) {
- // lets avoid NullPointerException when converting to boolean for null values
- if (boolean.class.isAssignableFrom(to)) {
- return Boolean.FALSE;
- }
- return null;
- }
-
- // eager same instance type test to avoid the overhead of invoking the type converter
- // if already same type
- if (to.isAssignableFrom(value.getClass())) {
- return to.cast(value);
- }
-
- if (boolean.class.isAssignableFrom(to) && value instanceof String) {
- return Boolean.valueOf((String) value);
- }
-
- throw new IllegalArgumentException("Cannot convert from " + value.getClass()
- + " to " + to + " with value " + value);
- }
-
- private static Method findSetterMethod(Class<?> clazz, String name) {
- // Build the method name.
- name = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
- Method[] methods = clazz.getMethods();
- for (Method method : methods) {
- Class<?> params[] = method.getParameterTypes();
- if (method.getName().equals(name) && params.length == 1 ) {
- return method;
- }
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java
deleted file mode 100755
index 38ccf6a..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
- * {@link QueueConnection} which is pooled and on {@link #close()} will return
- * itself to the sessionPool.
- *
- * <b>NOTE</b> this implementation is only intended for use when sending
- * messages. It does not deal with pooling of consumers; for that look at a
- * library like <a href="http://jencks.org/">Jencks</a> such as in <a
- * href="http://jencks.org/Message+Driven+POJOs">this example</a>
- *
- */
-public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener {
- private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
-
- protected ConnectionPool pool;
- private volatile boolean stopped;
- private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<>();
- private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<>();
- private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<>();
-
- /**
- * Creates a new PooledConnection instance that uses the given ConnectionPool to create
- * and manage its resources. The ConnectionPool instance can be shared amongst many
- * PooledConnection instances.
- *
- * @param pool
- * The connection and pool manager backing this proxy connection object.
- */
- public PooledConnection(ConnectionPool pool) {
- this.pool = pool;
- }
-
- /**
- * Factory method to create a new instance.
- */
- public PooledConnection newInstance() {
- return new PooledConnection(pool);
- }
-
- @Override
- public void close() throws JMSException {
- this.cleanupConnectionTemporaryDestinations();
- this.cleanupAllLoanedSessions();
- if (this.pool != null) {
- this.pool.decrementReferenceCount();
- this.pool = null;
- }
- }
-
- @Override
- public void start() throws JMSException {
- assertNotClosed();
- pool.start();
- }
-
- @Override
- public void stop() throws JMSException {
- stopped = true;
- }
-
- @Override
- public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
- return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
- }
-
- @Override
- public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
- return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
- }
-
- @Override
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
- return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
- }
-
- @Override
- public String getClientID() throws JMSException {
- return getConnection().getClientID();
- }
-
- @Override
- public ExceptionListener getExceptionListener() throws JMSException {
- return getConnection().getExceptionListener();
- }
-
- @Override
- public ConnectionMetaData getMetaData() throws JMSException {
- return getConnection().getMetaData();
- }
-
- @Override
- public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
- getConnection().setExceptionListener(exceptionListener);
- }
-
- @Override
- public void setClientID(String clientID) throws JMSException {
- // ignore repeated calls to setClientID() with the same client id
- // this could happen when a JMS component such as Spring that uses a
- // PooledConnectionFactory shuts down and reinitializes.
- if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
- getConnection().setClientID(clientID);
- }
- }
-
- @Override
- public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
- return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
- }
-
- // Session factory methods
- // -------------------------------------------------------------------------
- @Override
- public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
- return (QueueSession) createSession(transacted, ackMode);
- }
-
- @Override
- public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
- return (TopicSession) createSession(transacted, ackMode);
- }
-
- @Override
- public Session createSession(boolean transacted, int ackMode) throws JMSException {
- PooledSession result;
- result = (PooledSession) pool.createSession(transacted, ackMode);
-
- // Store the session so we can close the sessions that this PooledConnection
- // created in order to ensure that consumers etc are closed per the JMS contract.
- loanedSessions.add(result);
-
- // Add a event listener to the session that notifies us when the session
- // creates / destroys temporary destinations and closes etc.
- result.addSessionEventListener(this);
- return result;
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
-
- @Override
- public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
- connTempQueues.add(tempQueue);
- }
-
- @Override
- public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
- connTempTopics.add(tempTopic);
- }
-
- @Override
- public void onSessionClosed(PooledSession session) {
- if (session != null) {
- this.loanedSessions.remove(session);
- }
- }
-
- public Connection getConnection() throws JMSException {
- assertNotClosed();
- return pool.getConnection();
- }
-
- protected void assertNotClosed() throws javax.jms.IllegalStateException {
- if (stopped || pool == null) {
- throw new javax.jms.IllegalStateException("Connection closed");
- }
- }
-
- protected Session createSession(SessionKey key) throws JMSException {
- return getConnection().createSession(key.isTransacted(), key.getAckMode());
- }
-
- @Override
- public String toString() {
- return "PooledConnection { " + pool + " }";
- }
-
- /**
- * Remove all of the temporary destinations created for this connection.
- * This is important since the underlying connection may be reused over a
- * long period of time, accumulating all of the temporary destinations from
- * each use. However, from the perspective of the lifecycle from the
- * client's view, close() closes the connection and, therefore, deletes all
- * of the temporary destinations created.
- */
- protected void cleanupConnectionTemporaryDestinations() {
-
- for (TemporaryQueue tempQueue : connTempQueues) {
- try {
- tempQueue.delete();
- } catch (JMSException ex) {
- LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
- }
- }
- connTempQueues.clear();
-
- for (TemporaryTopic tempTopic : connTempTopics) {
- try {
- tempTopic.delete();
- } catch (JMSException ex) {
- LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
- }
- }
- connTempTopics.clear();
- }
-
- /**
- * The PooledSession tracks all Sessions that it created and now we close them. Closing the
- * PooledSession will return the internal Session to the Pool of Session after cleaning up
- * all the resources that the Session had allocated for this PooledConnection.
- */
- protected void cleanupAllLoanedSessions() {
-
- for (PooledSession session : loanedSessions) {
- try {
- session.close();
- } catch (JMSException ex) {
- LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
- }
- }
- loanedSessions.clear();
- }
-
- /**
- * @return the total number of Pooled session including idle sessions that are not
- * currently loaned out to any client.
- */
- public int getNumSessions() {
- return this.pool.getNumSessions();
- }
-
- /**
- * @return the number of Sessions that are currently checked out of this Connection's session pool.
- */
- public int getNumActiveSessions() {
- return this.pool.getNumActiveSessions();
- }
-
- /**
- * @return the number of Sessions that are idle in this Connection's sessions pool.
- */
- public int getNumtIdleSessions() {
- return this.pool.getNumIdleSessions();
- }
-}