You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2016/03/15 19:41:44 UTC
[3/3] tomee git commit: TOMEE-1735 starting JMS2 implementation
TOMEE-1735 starting JMS2 implementation
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/1bf768b7
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/1bf768b7
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/1bf768b7
Branch: refs/heads/master
Commit: 1bf768b76a3f6fc5fb910b60d02a7b18a527561e
Parents: a329a6e
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Tue Mar 15 19:41:17 2016 +0100
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Tue Mar 15 19:41:17 2016 +0100
----------------------------------------------------------------------
.../org/apache/openejb/cdi/CdiBeanInfo.java | 12 +
.../openejb/cdi/OptimizedLoaderService.java | 16 +-
.../openejb/config/AnnotationDeployer.java | 56 ++
.../openejb/config/BaseConvertDefinitions.java | 66 +++
.../openejb/config/ConfigurationFactory.java | 1 +
.../config/ConvertDataSourceDefinitions.java | 41 +-
.../ConvertJMSConnectionFactoryDefinitions.java | 111 ++++
.../apache/openejb/config/ReadDescriptors.java | 2 +-
.../openejb/resource/AutoConnectionTracker.java | 10 +-
.../activemq/ActiveMQResourceAdapter.java | 11 +
.../resource/activemq/jms2/DelegateMessage.java | 281 ++++++++++
.../openejb/resource/activemq/jms2/JMS2.java | 115 ++++
.../resource/activemq/jms2/JMSConsumerImpl.java | 159 ++++++
.../resource/activemq/jms2/JMSContextImpl.java | 535 +++++++++++++++++++
.../resource/activemq/jms2/JMSProducerImpl.java | 501 +++++++++++++++++
.../resource/activemq/jms2/TomEEConnection.java | 57 ++
.../activemq/jms2/TomEEConnectionFactory.java | 51 ++
.../activemq/jms2/TomEEManagedConnection.java | 60 +++
.../jms2/TomEEManagedConnectionFactory.java | 70 +++
.../jms2/TomEEManagedConnectionProxy.java | 72 +++
.../activemq/jms2/TomEERAConnectionFactory.java | 52 ++
.../activemq/jms2/TomEEXAConnection.java | 42 ++
.../activemq/jms2/TomEEXAConnectionFactory.java | 76 +++
.../activemq/jms2/WrappingByteMessage.java | 184 +++++++
.../activemq/jms2/WrappingMapMessage.java | 173 ++++++
.../activemq/jms2/WrappingObjectMessage.java | 50 ++
.../activemq/jms2/WrappingStreamMessage.java | 160 ++++++
.../activemq/jms2/WrappingTextMessage.java | 49 ++
.../activemq/jms2/XAJMSContextImpl.java | 39 ++
.../activemq/jms2/cdi/JMS2CDIExtension.java | 465 ++++++++++++++++
.../META-INF/org.apache.openejb/service-jar.xml | 2 +-
.../apache/openejb/activemq/JMS2AMQTest.java | 314 +++++++++++
.../activemq/JMSConnectionFactoryTest.java | 74 +++
.../openejb/jee/JMSConnectionFactory$JAXB.java | 244 +++++++++
.../org/apache/openejb/jee/WebApp$JAXB.java | 14 +
.../org/apache/openejb/jee/Application.java | 12 +
.../apache/openejb/jee/ApplicationClient.java | 12 +
.../java/org/apache/openejb/jee/EntityBean.java | 12 +
.../org/apache/openejb/jee/Interceptor.java | 10 +
.../openejb/jee/JMSConnectionFactory.java | 187 +++++++
.../org/apache/openejb/jee/JndiConsumer.java | 20 +-
.../apache/openejb/jee/MessageDrivenBean.java | 12 +
.../org/apache/openejb/jee/SessionBean.java | 12 +
.../java/org/apache/openejb/jee/WebApp.java | 27 +
.../org/apache/openejb/jee/WebFragment.java | 12 +
45 files changed, 4429 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/cdi/CdiBeanInfo.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/cdi/CdiBeanInfo.java b/container/openejb-core/src/main/java/org/apache/openejb/cdi/CdiBeanInfo.java
index ec88345..22eac30 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/cdi/CdiBeanInfo.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/cdi/CdiBeanInfo.java
@@ -22,6 +22,7 @@ import org.apache.openejb.jee.DataSource;
import org.apache.openejb.jee.EjbLocalRef;
import org.apache.openejb.jee.EjbRef;
import org.apache.openejb.jee.EnvEntry;
+import org.apache.openejb.jee.JMSConnectionFactory;
import org.apache.openejb.jee.JndiConsumer;
import org.apache.openejb.jee.KeyedCollection;
import org.apache.openejb.jee.LifecycleCallback;
@@ -53,6 +54,7 @@ public class CdiBeanInfo implements JndiConsumer {
protected List<LifecycleCallback> postConstruct;
protected List<LifecycleCallback> preDestroy;
protected KeyedCollection<String, DataSource> dataSource;
+ protected KeyedCollection<String, JMSConnectionFactory> jmsConnectionFactories;
protected List<LifecycleCallback> postActivate;
protected List<LifecycleCallback> prePassivate;
protected List<SecurityRoleRef> securityRoleRef;
@@ -282,6 +284,16 @@ public class CdiBeanInfo implements JndiConsumer {
return this.dataSource.toMap();
}
+ @Override
+ public Collection<JMSConnectionFactory> getJMSConnectionFactories() {
+ return jmsConnectionFactories == null ? (jmsConnectionFactories = new KeyedCollection<>()) : jmsConnectionFactories;
+ }
+
+ @Override
+ public Map<String, JMSConnectionFactory> getJMSConnectionFactoriesMap() {
+ return KeyedCollection.class.cast(getJMSConnectionFactories()).toMap();
+ }
+
public String getJndiConsumerName() {
return beanName;
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/cdi/OptimizedLoaderService.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/cdi/OptimizedLoaderService.java b/container/openejb-core/src/main/java/org/apache/openejb/cdi/OptimizedLoaderService.java
index 1317520..9051c6f 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/cdi/OptimizedLoaderService.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/cdi/OptimizedLoaderService.java
@@ -19,6 +19,7 @@ package org.apache.openejb.cdi;
import org.apache.openejb.core.ParentClassLoaderFinder;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.jms2.cdi.JMS2CDIExtension;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
import org.apache.openejb.util.classloader.ClassLoaderAwareHandler;
@@ -87,6 +88,10 @@ public class OptimizedLoaderService implements LoaderService {
}
}
+ if (hasJms()) {
+ list.add(new JMS2CDIExtension());
+ }
+
final Collection<Extension> extensionCopy = new ArrayList<>(list);
final Iterator<Extension> it = list.iterator();
@@ -100,6 +105,15 @@ public class OptimizedLoaderService implements LoaderService {
return list;
}
+ private boolean hasJms() {
+ try {
+ Thread.currentThread().getContextClassLoader().loadClass("org.apache.activemq.ra.ActiveMQManagedConnectionFactory");
+ return true;
+ } catch (final NoClassDefFoundError | ClassNotFoundException e) {
+ return false;
+ }
+ }
+
// mainly intended to avoid conflicts between internal and overrided spec extensions
private boolean isFiltered(final Collection<Extension> extensions, final Extension next) {
final ClassLoader containerLoader = ParentClassLoaderFinder.Helper.get();
@@ -167,7 +181,7 @@ public class OptimizedLoaderService implements LoaderService {
clazz = loader.loadClass("org.apache.webbeans.jsf.plugin.OpenWebBeansJsfPlugin");
try {
list.add(OpenWebBeansPlugin.class.cast(
- Proxy.newProxyInstance(loader, new Class<?>[]{OpenWebBeansPlugin.class}, new ClassLoaderAwareHandler(clazz.getSimpleName(), clazz.newInstance(), loader))));
+ Proxy.newProxyInstance(loader, new Class<?>[]{OpenWebBeansPlugin.class}, new ClassLoaderAwareHandler(clazz.getSimpleName(), clazz.newInstance(), loader))));
} catch (final Exception e) {
log.error("Unable to load OpenWebBeansPlugin: OpenWebBeansJsfPlugin");
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java b/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java
index 2d4569a..1d035ba 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/config/AnnotationDeployer.java
@@ -68,6 +68,7 @@ import org.apache.openejb.jee.Interceptor;
import org.apache.openejb.jee.InterceptorBinding;
import org.apache.openejb.jee.Invokable;
import org.apache.openejb.jee.IsolationLevel;
+import org.apache.openejb.jee.JMSConnectionFactory;
import org.apache.openejb.jee.JndiConsumer;
import org.apache.openejb.jee.JndiReference;
import org.apache.openejb.jee.License;
@@ -202,6 +203,8 @@ import javax.enterprise.inject.spi.Extension;
import javax.interceptor.ExcludeClassInterceptors;
import javax.interceptor.ExcludeDefaultInterceptors;
import javax.interceptor.Interceptors;
+import javax.jms.JMSConnectionFactoryDefinition;
+import javax.jms.JMSConnectionFactoryDefinitions;
import javax.jms.Queue;
import javax.jws.HandlerChain;
import javax.jws.WebService;
@@ -4000,6 +4003,22 @@ public class AnnotationDeployer implements DynamicDeployer {
final DataSourceDefinition definition = annotated.getAnnotation(DataSourceDefinition.class);
buildDataSourceDefinition(consumer, definition);
}
+
+ //
+ // @JMSConnectionFactoryDefinition
+ //
+
+ for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(JMSConnectionFactoryDefinitions.class)) {
+ final JMSConnectionFactoryDefinitions defs = annotated.getAnnotation(JMSConnectionFactoryDefinitions.class);
+ for (final JMSConnectionFactoryDefinition definition : defs.value()) {
+ buildConnectionFactoryDefinition(consumer, definition);
+ }
+ }
+
+ for (final Annotated<Class<?>> annotated : annotationFinder.findMetaAnnotatedClasses(JMSConnectionFactoryDefinition.class)) {
+ final JMSConnectionFactoryDefinition definition = annotated.getAnnotation(JMSConnectionFactoryDefinition.class);
+ buildConnectionFactoryDefinition(consumer, definition);
+ }
}
private void buildContext(final JndiConsumer consumer, final Member member) {
@@ -4651,6 +4670,43 @@ public class AnnotationDeployer implements DynamicDeployer {
}
}
+ private void buildConnectionFactoryDefinition(final JndiConsumer consumer, final JMSConnectionFactoryDefinition definition) {
+ final JMSConnectionFactory connectionFactory = new JMSConnectionFactory();
+ connectionFactory.setName(definition.name());
+ connectionFactory.setMinPoolSize(definition.minPoolSize());
+ connectionFactory.setMaxPoolSize(definition.maxPoolSize());
+ connectionFactory.setClassName(definition.className());
+ connectionFactory.setInterfaceName(definition.interfaceName());
+ connectionFactory.setClientId(definition.clientId());
+ connectionFactory.setUser(definition.user());
+ connectionFactory.setPassword(definition.password());
+ connectionFactory.setResourceAdapter(definition.resourceAdapter());
+ connectionFactory.setTransactional(definition.transactional());
+
+ for (final String s : definition.properties()) {
+ final int equal = s.indexOf('=');
+ if (equal < s.length() - 1) {
+ final SuperProperties props = new SuperProperties();
+ try {
+ props.load(new ByteArrayInputStream(s.getBytes()));
+ for (final String key : props.stringPropertyNames()) {
+ if (!key.isEmpty()) {
+ connectionFactory.property(key, props.getProperty(key));
+ }
+ }
+ } catch (final IOException e) {
+ final String key = s.substring(0, equal).trim();
+ final String value = s.substring(equal + 1).trim();
+ connectionFactory.property(key, value);
+ }
+ } else {
+ connectionFactory.property(s.trim(), "");
+ }
+ }
+
+ consumer.getJMSConnectionFactories().add(connectionFactory);
+ }
+
private void buildDataSourceDefinition(final JndiConsumer consumer, final DataSourceDefinition d) {
final DataSource dataSource = new DataSource();
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/config/BaseConvertDefinitions.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/BaseConvertDefinitions.java b/container/openejb-core/src/main/java/org/apache/openejb/config/BaseConvertDefinitions.java
new file mode 100644
index 0000000..c543c2d
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/config/BaseConvertDefinitions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openejb.config;
+
+import org.apache.openejb.jee.JndiConsumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class BaseConvertDefinitions implements DynamicDeployer {
+ protected String cleanUpName(final String factory) {
+ String name = factory;
+ name = name.replaceFirst("java:comp/env/", "");
+ name = name.replaceFirst("java:/", "");
+ name = name.replaceFirst("java:", "");
+ return name;
+ }
+
+ protected List<JndiConsumer> collectConsumers(final AppModule appModule) {
+
+ final List<JndiConsumer> jndiConsumers = new ArrayList<JndiConsumer>();
+
+ for (final ClientModule module : appModule.getClientModules()) {
+ final JndiConsumer consumer = module.getApplicationClient();
+ if (consumer == null) {
+ continue;
+ }
+ jndiConsumers.add(consumer);
+ }
+
+ for (final WebModule webModule : appModule.getWebModules()) {
+ final JndiConsumer consumer = webModule.getWebApp();
+ if (consumer == null) {
+ continue;
+ }
+ jndiConsumers.add(consumer);
+ }
+
+ for (final EjbModule ejbModule : appModule.getEjbModules()) {
+ Collections.addAll(jndiConsumers, ejbModule.getEjbJar().getEnterpriseBeans());
+ }
+
+ if (appModule.getApplication() != null) {
+ jndiConsumers.add(appModule.getApplication());
+ }
+
+ return jndiConsumers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/config/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/ConfigurationFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/config/ConfigurationFactory.java
index c3196f0..8e06e20 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/config/ConfigurationFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/config/ConfigurationFactory.java
@@ -285,6 +285,7 @@ public class ConfigurationFactory implements OpenEjbConfigurationFactory {
}
chain.add(new ConvertDataSourceDefinitions());
+ chain.add(new ConvertJMSConnectionFactoryDefinitions());
chain.add(new CleanEnvEntries());
chain.add(new LinkBuiltInTypes());
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertDataSourceDefinitions.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertDataSourceDefinitions.java b/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertDataSourceDefinitions.java
index 5353379..babd933 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertDataSourceDefinitions.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertDataSourceDefinitions.java
@@ -28,15 +28,13 @@ import org.apache.openejb.util.PropertyPlaceHolderHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* @version $Rev$ $Date$
*/
-public class ConvertDataSourceDefinitions implements DynamicDeployer {
+public class ConvertDataSourceDefinitions extends BaseConvertDefinitions {
@Override
public AppModule deploy(final AppModule appModule) throws OpenEJBException {
@@ -61,10 +59,7 @@ public class ConvertDataSourceDefinitions implements DynamicDeployer {
private Resource toResource(final DataSource datasource) {
- String name = datasource.getName();
- name = name.replaceFirst("java:comp/env/", "");
- name = name.replaceFirst("java:/", "");
- name = name.replaceFirst("java:", "");
+ final String name = cleanUpName(datasource.getName());
final Resource def = new Resource(name, javax.sql.DataSource.class.getName());
@@ -148,36 +143,4 @@ public class ConvertDataSourceDefinitions implements DynamicDeployer {
properties.put(key, PropertyPlaceHolderHelper.value(String.valueOf(value)));
}
-
- private List<JndiConsumer> collectConsumers(final AppModule appModule) {
-
- final List<JndiConsumer> jndiConsumers = new ArrayList<JndiConsumer>();
-
- for (final ClientModule module : appModule.getClientModules()) {
- final JndiConsumer consumer = module.getApplicationClient();
- if (consumer == null) {
- continue;
- }
- jndiConsumers.add(consumer);
- }
-
- for (final WebModule webModule : appModule.getWebModules()) {
- final JndiConsumer consumer = webModule.getWebApp();
- if (consumer == null) {
- continue;
- }
- jndiConsumers.add(consumer);
- }
-
- for (final EjbModule ejbModule : appModule.getEjbModules()) {
- Collections.addAll(jndiConsumers, ejbModule.getEjbJar().getEnterpriseBeans());
- }
-
- if (appModule.getApplication() != null) {
- jndiConsumers.add(appModule.getApplication());
- }
-
- return jndiConsumers;
- }
-
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertJMSConnectionFactoryDefinitions.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertJMSConnectionFactoryDefinitions.java b/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertJMSConnectionFactoryDefinitions.java
new file mode 100644
index 0000000..f343829
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/config/ConvertJMSConnectionFactoryDefinitions.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openejb.config;
+
+import org.apache.openejb.OpenEJBException;
+import org.apache.openejb.config.sys.Resource;
+import org.apache.openejb.jee.JMSConnectionFactory;
+import org.apache.openejb.jee.JndiConsumer;
+import org.apache.openejb.jee.KeyedCollection;
+import org.apache.openejb.jee.Property;
+import org.apache.openejb.util.PropertyPlaceHolderHelper;
+
+import java.util.List;
+import java.util.Properties;
+
+public class ConvertJMSConnectionFactoryDefinitions extends BaseConvertDefinitions {
+ @Override
+ public AppModule deploy(final AppModule appModule) throws OpenEJBException {
+ final List<JndiConsumer> jndiConsumers = collectConsumers(appModule);
+ final KeyedCollection<String, JMSConnectionFactory> factories = new KeyedCollection<>();
+ for (final JndiConsumer consumer : jndiConsumers) {
+ if (consumer != null) {
+ factories.addAll(consumer.getJMSConnectionFactories());
+ }
+ }
+ for (final JMSConnectionFactory factory : factories) {
+ appModule.getResources().add(toResource(factory));
+ }
+ return appModule;
+ }
+
+
+ private Resource toResource(final JMSConnectionFactory factory) {
+ final String name = cleanUpName(factory.getName());
+
+ final Resource factoryResource = new Resource(name, javax.jms.ConnectionFactory.class.getName());
+
+ factoryResource.setJndi(factory.getName().replaceFirst("java:", ""));
+ factoryResource.setType(
+ factory.getInterfaceName() != null && !factory.getInterfaceName().isEmpty() ?
+ factory.getInterfaceName() : "javax.jms.ConnectionFactory");
+ if (factory.getClassName() != null && !factory.getClassName().isEmpty()) {
+ factoryResource.setClassName(factory.getClassName());
+ }
+
+ final Properties p = factoryResource.getProperties();
+ put(p, AutoConfig.ORIGIN_FLAG, AutoConfig.ORIGIN_ANNOTATION);
+ put(p, "JndiName", factoryResource.getJndi());
+ if (factory.getResourceAdapter() != null && !factory.getResourceAdapter().isEmpty()) {
+ put(p, "ResourceAdapter", factory.getResourceAdapter());
+ }
+ if (factory.isTransactional()) {
+ put(p, "TransactionSupport", "xa");
+ } else {
+ put(p, "TransactionSupport", "none");
+ }
+ if (factory.getMaxPoolSize() != null && factory.getMaxPoolSize() > 0) {
+ put(p, "PoolMaxSize", factory.getMaxPoolSize());
+ }
+ if (factory.getMinPoolSize() != null && factory.getMinPoolSize() > 0) {
+ put(p, "PoolMinSize", factory.getMinPoolSize());
+ }
+ if (factory.getUser() != null && !factory.getUser().isEmpty()) {
+ put(p, "UserName", factory.getUser());
+ }
+ if (factory.getPassword() != null && !factory.getPassword().isEmpty()) {
+ put(p, "Password", factory.getUser());
+ }
+ if (factory.getClientId() != null && !factory.getClientId().isEmpty()) {
+ put(p, "Clientid", factory.getUser());
+ }
+
+ setProperties(factory, p);
+ return factoryResource;
+ }
+
+ private void setProperties(final JMSConnectionFactory d, final Properties p) {
+ for (final Property property : d.getProperty()) {
+
+ final String key = property.getName();
+ final String value = property.getValue();
+
+ put(p, key, value);
+ }
+ }
+
+ private static void put(final Properties properties, final String key, final Object value) {
+ if (key == null) {
+ return;
+ }
+ if (value == null) {
+ return;
+ }
+ properties.put(key, PropertyPlaceHolderHelper.value(String.valueOf(value)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/config/ReadDescriptors.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/config/ReadDescriptors.java b/container/openejb-core/src/main/java/org/apache/openejb/config/ReadDescriptors.java
index b2bd6fa..c60521a 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/config/ReadDescriptors.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/config/ReadDescriptors.java
@@ -75,7 +75,6 @@ import javax.xml.bind.JAXBException;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;
-
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@@ -614,6 +613,7 @@ public class ReadDescriptors implements DynamicDeployer {
mergeOnlyMissingEntries(webModule.getWebApp().getPersistenceUnitRefMap(), webFragment.getPersistenceUnitRef());
mergeOnlyMissingEntries(webModule.getWebApp().getMessageDestinationRefMap(), webFragment.getMessageDestinationRef());
mergeOnlyMissingEntries(webModule.getWebApp().getDataSourceMap(), webFragment.getDataSource());
+ mergeOnlyMissingEntries(webModule.getWebApp().getJMSConnectionFactoriesMap(), webFragment.getJMSConnectionFactories());
mergeOnlyMissingEntries(webModule.getWebApp().getEjbLocalRefMap(), webFragment.getEjbLocalRef());
mergeOnlyMissingEntries(webModule.getWebApp().getEjbRefMap(), webFragment.getEjbRef());
mergeOnlyMissingEntries(webModule.getWebApp().getServiceRefMap(), webFragment.getServiceRef());
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
index f71c5b2..02636ab 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
@@ -22,6 +22,8 @@ import org.apache.geronimo.connector.outbound.ConnectionReturnAction;
import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor;
import org.apache.geronimo.connector.outbound.ManagedConnectionInfo;
import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
+import org.apache.openejb.util.LogCategory;
+import org.apache.openejb.util.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.DissociatableManagedConnection;
@@ -126,11 +128,15 @@ public class AutoConnectionTracker implements ConnectionTracker {
}
try {
- final Object value = method.invoke(handle, args);
- return value;
+ return method.invoke(handle, args);
} catch (final InvocationTargetException ite) {
// catch InvocationTargetExceptions and turn them into the target exception (if there is one)
final Throwable t = ite.getTargetException();
+ if (AbstractMethodError.class.isInstance(t)) {
+ // "debug" info
+ Logger.getInstance(LogCategory.OPENEJB, AutoConnectionTracker.class)
+ .error("Missing method: " + method + " on " + handle);
+ }
if (t != null) {
throw t;
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
index 8880875..d30beb4 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
@@ -17,7 +17,11 @@
package org.apache.openejb.resource.activemq;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
+import org.apache.activemq.ra.MessageActivationSpec;
+import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
import org.apache.openejb.util.Duration;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
@@ -183,6 +187,13 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso
}
}
+ @Override
+ protected ActiveMQConnectionFactory createConnectionFactory(final ActiveMQConnectionRequestInfo connectionRequestInfo, final MessageActivationSpec activationSpec) {
+ final ActiveMQConnectionFactory factory = new TomEEConnectionFactory();
+ connectionRequestInfo.configure(factory, activationSpec);
+ return factory;
+ }
+
private void stopImpl() throws Exception {
super.stop();
final Collection<BrokerService> brokers = ActiveMQFactory.getBrokers();
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/DelegateMessage.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/DelegateMessage.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/DelegateMessage.java
new file mode 100644
index 0000000..a7efd81
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/DelegateMessage.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.Enumeration;
+
+// used to wrap a JMS 1.0 message and provide a JMS 2.0 impl
+public class DelegateMessage implements Message {
+ private final Message message;
+ private long deliveryTime;
+
+ public DelegateMessage(final Message message) {
+ this.message = message;
+ }
+
+ // JMS 2.0
+
+ @Override
+ public long getJMSDeliveryTime() throws JMSException {
+ return deliveryTime;
+ }
+
+ @Override
+ public void setJMSDeliveryTime(final long deliveryTime) throws JMSException {
+ this.deliveryTime = deliveryTime;
+ }
+
+ @Override
+ public <T> T getBody(final Class<T> c) throws JMSException {
+ return message.getBody(c);
+ }
+
+ @Override
+ public boolean isBodyAssignableTo(final Class c) throws JMSException {
+ return message.isBodyAssignableTo(c);
+ }
+
+ // delegation to JMS 1.0
+
+ @Override
+ public String getJMSMessageID() throws JMSException {
+ return message.getJMSMessageID();
+ }
+
+ @Override
+ public void setJMSMessageID(final String id) throws JMSException {
+ message.setJMSMessageID(id);
+ }
+
+ @Override
+ public long getJMSTimestamp() throws JMSException {
+ return message.getJMSTimestamp();
+ }
+
+ @Override
+ public void setJMSTimestamp(final long timestamp) throws JMSException {
+ message.setJMSTimestamp(timestamp);
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return message.getJMSCorrelationIDAsBytes();
+ }
+
+ @Override
+ public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException {
+ message.setJMSCorrelationIDAsBytes(correlationID);
+ }
+
+ @Override
+ public void setJMSCorrelationID(final String correlationID) throws JMSException {
+ message.setJMSCorrelationID(correlationID);
+ }
+
+ @Override
+ public String getJMSCorrelationID() throws JMSException {
+ return message.getJMSCorrelationID();
+ }
+
+ @Override
+ public Destination getJMSReplyTo() throws JMSException {
+ return message.getJMSReplyTo();
+ }
+
+ @Override
+ public void setJMSReplyTo(final Destination replyTo) throws JMSException {
+ message.setJMSReplyTo(replyTo);
+ }
+
+ @Override
+ public Destination getJMSDestination() throws JMSException {
+ return message.getJMSDestination();
+ }
+
+ @Override
+ public void setJMSDestination(final Destination destination) throws JMSException {
+ message.setJMSDestination(destination);
+ }
+
+ @Override
+ public int getJMSDeliveryMode() throws JMSException {
+ return message.getJMSDeliveryMode();
+ }
+
+ @Override
+ public void setJMSDeliveryMode(final int deliveryMode) throws JMSException {
+ message.setJMSDeliveryMode(deliveryMode);
+ }
+
+ @Override
+ public boolean getJMSRedelivered() throws JMSException {
+ return message.getJMSRedelivered();
+ }
+
+ @Override
+ public void setJMSRedelivered(final boolean redelivered) throws JMSException {
+ message.setJMSRedelivered(redelivered);
+ }
+
+ @Override
+ public String getJMSType() throws JMSException {
+ return message.getJMSType();
+ }
+
+ @Override
+ public void setJMSType(final String type) throws JMSException {
+ message.setJMSType(type);
+ }
+
+ @Override
+ public long getJMSExpiration() throws JMSException {
+ return message.getJMSExpiration();
+ }
+
+ @Override
+ public void setJMSExpiration(final long expiration) throws JMSException {
+ message.setJMSExpiration(expiration);
+ }
+
+ @Override
+ public int getJMSPriority() throws JMSException {
+ return message.getJMSPriority();
+ }
+
+ @Override
+ public void setJMSPriority(final int priority) throws JMSException {
+ message.setJMSPriority(priority);
+ }
+
+ @Override
+ public void clearProperties() throws JMSException {
+ message.clearProperties();
+ }
+
+ @Override
+ public boolean propertyExists(final String name) throws JMSException {
+ return message.propertyExists(name);
+ }
+
+ @Override
+ public boolean getBooleanProperty(final String name) throws JMSException {
+ return message.getBooleanProperty(name);
+ }
+
+ @Override
+ public byte getByteProperty(final String name) throws JMSException {
+ return message.getByteProperty(name);
+ }
+
+ @Override
+ public short getShortProperty(final String name) throws JMSException {
+ return message.getShortProperty(name);
+ }
+
+ @Override
+ public int getIntProperty(final String name) throws JMSException {
+ return message.getIntProperty(name);
+ }
+
+ @Override
+ public long getLongProperty(final String name) throws JMSException {
+ return message.getLongProperty(name);
+ }
+
+ @Override
+ public float getFloatProperty(final String name) throws JMSException {
+ return message.getFloatProperty(name);
+ }
+
+ @Override
+ public double getDoubleProperty(final String name) throws JMSException {
+ return message.getDoubleProperty(name);
+ }
+
+ @Override
+ public String getStringProperty(final String name) throws JMSException {
+ return message.getStringProperty(name);
+ }
+
+ @Override
+ public Object getObjectProperty(final String name) throws JMSException {
+ return message.getObjectProperty(name);
+ }
+
+ @Override
+ public Enumeration getPropertyNames() throws JMSException {
+ return message.getPropertyNames();
+ }
+
+ @Override
+ public void setBooleanProperty(final String name, final boolean value) throws JMSException {
+ message.setBooleanProperty(name, value);
+ }
+
+ @Override
+ public void setByteProperty(final String name, final byte value) throws JMSException {
+ message.setByteProperty(name, value);
+ }
+
+ @Override
+ public void setShortProperty(final String name, final short value) throws JMSException {
+ message.setShortProperty(name, value);
+ }
+
+ @Override
+ public void setIntProperty(final String name, final int value) throws JMSException {
+ message.setIntProperty(name, value);
+ }
+
+ @Override
+ public void setLongProperty(final String name, final long value) throws JMSException {
+ message.setLongProperty(name, value);
+ }
+
+ @Override
+ public void setFloatProperty(final String name, final float value) throws JMSException {
+ message.setFloatProperty(name, value);
+ }
+
+ @Override
+ public void setDoubleProperty(final String name, final double value) throws JMSException {
+ message.setDoubleProperty(name, value);
+ }
+
+ @Override
+ public void setStringProperty(final String name, final String value) throws JMSException {
+ message.setStringProperty(name, value);
+ }
+
+ @Override
+ public void setObjectProperty(final String name, final Object value) throws JMSException {
+ message.setObjectProperty(name, value);
+ }
+
+ @Override
+ public void acknowledge() throws JMSException {
+ message.acknowledge();
+ }
+
+ @Override
+ public void clearBody() throws JMSException {
+ message.clearBody();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
new file mode 100644
index 0000000..33118e3
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import javax.jms.BytesMessage;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidClientIDRuntimeException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidDestinationRuntimeException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.InvalidSelectorRuntimeException;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.JMSSecurityException;
+import javax.jms.JMSSecurityRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageNotWriteableRuntimeException;
+import javax.jms.ObjectMessage;
+import javax.jms.ResourceAllocationException;
+import javax.jms.ResourceAllocationRuntimeException;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.jms.TransactionInProgressException;
+import javax.jms.TransactionInProgressRuntimeException;
+import javax.jms.TransactionRolledBackException;
+import javax.jms.TransactionRolledBackRuntimeException;
+
+class JMS2 {
+ private JMS2() {
+ // no-op
+ }
+
+ public static JMSRuntimeException toRuntimeException(final JMSException e) {
+ if (e instanceof javax.jms.IllegalStateException) {
+ return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof InvalidClientIDException) {
+ return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof InvalidDestinationException) {
+ return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof InvalidSelectorException) {
+ return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof JMSSecurityException) {
+ return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof MessageFormatException) {
+ return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof MessageNotWriteableException) {
+ return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof ResourceAllocationException) {
+ return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof TransactionInProgressException) {
+ return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ if (e instanceof TransactionRolledBackException) {
+ return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+ return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
+ }
+
+ public static <T extends Message> T wrap(final T message10) {
+ if (message10 == null) {
+ return null;
+ }
+
+ // already wrapped // happens with producer -> context link
+ // but since we an switch the context better to ensure we wrap anyway
+ if (message10.getClass().getName().startsWith(JMS2.class.getPackage().getName())) {
+ return message10;
+ }
+
+ // jms -> wrappers
+ if (TextMessage.class.isInstance(message10)) {
+ return (T) new WrappingTextMessage(TextMessage.class.cast(message10));
+ }
+ if (ObjectMessage.class.isInstance(message10)) {
+ return (T) new WrappingObjectMessage(ObjectMessage.class.cast(message10));
+ }
+ if (MapMessage.class.isInstance(message10)) {
+ return (T) new WrappingMapMessage(MapMessage.class.cast(message10));
+ }
+ if (BytesMessage.class.isInstance(message10)) {
+ return (T) new WrappingByteMessage(BytesMessage.class.cast(message10));
+ }
+ if (StreamMessage.class.isInstance(message10)) {
+ return (T) new WrappingStreamMessage(StreamMessage.class.cast(message10));
+ }
+ return (T) new DelegateMessage(DelegateMessage.class.cast(message10));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSConsumerImpl.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSConsumerImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSConsumerImpl.java
new file mode 100644
index 0000000..26d0482
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSConsumerImpl.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import javax.jms.JMSConsumer;
+import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
+
+public class JMSConsumerImpl implements JMSConsumer {
+ private final JMSContextImpl context;
+ private final MessageConsumer consumer;
+
+ public JMSConsumerImpl(final JMSContextImpl jmsContext, final MessageConsumer consumer) {
+ this.context = jmsContext;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public String getMessageSelector() {
+ try {
+ return consumer.getMessageSelector();
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSRuntimeException {
+ try {
+ return consumer.getMessageListener();
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setMessageListener(final MessageListener listener) throws JMSRuntimeException {
+ try {
+ consumer.setMessageListener(new ContextUpdaterMessageListenerWrapper(context, listener));
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receive() {
+ try {
+ return context.setLastMessage(wrap(consumer.receive()));
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receive(final long timeout) {
+ try {
+ return context.setLastMessage(wrap(consumer.receive(timeout)));
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() {
+ try {
+ return context.setLastMessage(wrap(consumer.receiveNoWait()));
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBody(final Class<T> c) {
+ try {
+ final Message message = wrap(consumer.receive());
+ context.setLastMessage(message);
+ return message == null ? null : message.getBody(c);
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBody(final Class<T> c, final long timeout) {
+ try {
+ final Message message = wrap(consumer.receive(timeout));
+ context.setLastMessage(message);
+ return message == null ? null : message.getBody(c);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBodyNoWait(final Class<T> c) {
+ try {
+ final Message message = wrap(consumer.receiveNoWait());
+ context.setLastMessage(message);
+ return message == null ? null : message.getBody(c);
+ } catch (JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ private static Message wrap(final Message message) {
+ final Message wrapped = JMS2.wrap(message);
+ try {
+ wrapped.setJMSDeliveryTime(System.currentTimeMillis());
+ } catch (final JMSException e) {
+ // no-op: TODO: investigate if an issue or not in this context
+ }
+ return wrapped;
+ }
+
+ private static final class ContextUpdaterMessageListenerWrapper implements MessageListener {
+ private final JMSContextImpl context;
+ private final MessageListener wrapped;
+
+ private ContextUpdaterMessageListenerWrapper(final JMSContextImpl context, MessageListener wrapped) {
+ this.context = context;
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void onMessage(final Message message) {
+ final Message wrappedMessage = wrap(message);
+ context.setLastMessage(wrappedMessage);
+ wrapped.onMessage(wrappedMessage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
new file mode 100644
index 0000000..0a3c276
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import java.io.Serializable;
+
+import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
+import static org.apache.openejb.resource.activemq.jms2.JMS2.wrap;
+
+public class JMSContextImpl implements JMSContext {
+ private final int sessionMode;
+ private final String username;
+ private final String password;
+ private final ConnectionFactory factory;
+ private Session session;
+ private boolean autoStart = true;
+ private MessageProducer innerProducer;
+ private boolean xa;
+ private boolean closed;
+ private Connection connection;
+ private volatile Message lastMessagesWaitingAck;
+
+ public JMSContextImpl(final ConnectionFactory factory, final int sessionMode, final String user, final String pwd,
+ final boolean xa) {
+ this.factory = factory;
+ this.sessionMode = sessionMode;
+ this.username = user;
+ this.password = pwd;
+ this.xa = xa;
+ }
+
+ Message setLastMessage(final Message lastMessageReceived) {
+ if (sessionMode == CLIENT_ACKNOWLEDGE) {
+ lastMessagesWaitingAck = lastMessageReceived;
+ }
+ return lastMessageReceived;
+ }
+
+ protected Connection connection() {
+ if (connection == null) {
+ try {
+ connection = username != null ? factory.createConnection(username, password) : factory.createConnection();
+ xa = XAConnection.class.isInstance(connection);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+ return connection;
+ }
+
+ protected Session session() {
+ if (session == null) {
+ synchronized (this) {
+ if (closed) {
+ throw new IllegalStateRuntimeException("Context is closed");
+ }
+ if (session == null) {
+ try {
+ if (xa) {
+ session = XAConnection.class.cast(connection()).createXASession();
+ } else {
+ session = connection().createSession(sessionMode);
+ }
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+ }
+ }
+ return session;
+ }
+
+ private synchronized void checkAutoStart() throws JMSException {
+ if (closed) {
+ throw new IllegalStateRuntimeException("Context is closed");
+ }
+ if (autoStart) {
+ connection.start();
+ }
+ }
+
+ private synchronized MessageProducer getInnerProducer() throws JMSException {
+ if (innerProducer == null) {
+ innerProducer = session().createProducer(null);
+ }
+ return innerProducer;
+ }
+
+ @Override
+ public void acknowledge() {
+ session();
+ try {
+ if (lastMessagesWaitingAck != null) {
+ lastMessagesWaitingAck.acknowledge();
+ }
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ synchronized (this) {
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ closed = true;
+ }
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commit() {
+ try {
+ session().commit();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public QueueBrowser createBrowser(final Queue queue) {
+ try {
+ final QueueBrowser browser = session().createBrowser(queue);
+ checkAutoStart();
+ return browser;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public QueueBrowser createBrowser(final Queue queue, final String messageSelector) {
+ try {
+ final QueueBrowser browser = session().createBrowser(queue, messageSelector);
+ checkAutoStart();
+ return browser;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public BytesMessage createBytesMessage() {
+ try {
+ return wrap(session().createBytesMessage());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(final Destination destination) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createConsumer(destination));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(final Destination destination, final String messageSelector) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createConsumer(destination, messageSelector));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createConsumer(destination, messageSelector, noLocal));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSContext createContext(final int sessionMode) {
+ if (xa) {
+ throw new JMSRuntimeException("Illegal call to createContext");
+ }
+ return factory.createContext(sessionMode);
+ }
+
+ @Override
+ public JMSConsumer createDurableConsumer(final Topic topic, final String name) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createDurableConsumer(topic, name));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createDurableConsumer(final Topic topic, final String name, final String messageSelector, final boolean noLocal) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createDurableConsumer(topic, name, messageSelector, noLocal));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public MapMessage createMapMessage() {
+ try {
+ return wrap(session().createMapMessage());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message createMessage() {
+ try {
+ return wrap(session().createMessage());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() {
+ try {
+ return wrap(session().createObjectMessage());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(final Serializable object) {
+ try {
+ return wrap(session().createObjectMessage(object));
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer createProducer() {
+ try {
+ return new JMSProducerImpl(this, getInnerProducer());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Queue createQueue(final String queueName) {
+ try {
+ return session().createQueue(queueName);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedConsumer(final Topic topic, final String sharedSubscriptionName) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createSharedConsumer(topic, sharedSubscriptionName));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedConsumer(final Topic topic, final String sharedSubscriptionName, final String messageSelector) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createSharedConsumer(topic, sharedSubscriptionName, messageSelector));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedDurableConsumer(final Topic topic, final String name) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createSharedDurableConsumer(topic, name));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedDurableConsumer(final Topic topic, final String name, final String messageSelector) {
+ try {
+ final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createSharedDurableConsumer(topic, name, messageSelector));
+ checkAutoStart();
+ return consumer;
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() {
+ try {
+ return wrap(session().createStreamMessage());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TemporaryQueue createTemporaryQueue() {
+ try {
+ return session().createTemporaryQueue();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TemporaryTopic createTemporaryTopic() {
+ try {
+ return session().createTemporaryTopic();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TextMessage createTextMessage() {
+ try {
+ return wrap(session().createTextMessage());
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public TextMessage createTextMessage(final String text) {
+ try {
+ return wrap(session().createTextMessage(text));
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Topic createTopic(final String topicName) {
+ try {
+ return session().createTopic(topicName);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean getAutoStart() {
+ return autoStart;
+ }
+
+ @Override
+ public String getClientID() {
+ try {
+ return connection().getClientID();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ExceptionListener getExceptionListener() {
+ try {
+ return connection().getExceptionListener();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ConnectionMetaData getMetaData() {
+ try {
+ return connection().getMetaData();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public int getSessionMode() {
+ return sessionMode;
+ }
+
+ @Override
+ public boolean getTransacted() {
+ try {
+ return session().getTransacted();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void recover() {
+ try {
+ session().recover();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void rollback() {
+ session();
+ try {
+ session().rollback();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setAutoStart(boolean autoStart) {
+ this.autoStart = autoStart;
+ }
+
+ @Override
+ public void setClientID(final String clientID) {
+ if (xa) {
+ throw new JMSRuntimeException("Illegal call to setClientID");
+ }
+ try {
+ connection().setClientID(clientID);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) {
+ if (xa) {
+ throw new JMSRuntimeException("Illegal call to setExceptionListener");
+ }
+ try {
+ connection().setExceptionListener(listener);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void start() {
+ try {
+ connection().start();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (xa) {
+ throw new JMSRuntimeException("Illegal call to stop");
+ }
+ try { // TODO: ref counting
+ connection().stop();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void unsubscribe(final String name) {
+ try {
+ session().unsubscribe(name);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
new file mode 100644
index 0000000..9969639
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import org.apache.xbean.propertyeditor.PropertyEditorException;
+import org.apache.xbean.propertyeditor.PropertyEditors;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
+import static org.apache.openejb.resource.activemq.jms2.JMS2.wrap;
+
+class JMSProducerImpl implements JMSProducer {
+ private final JMSContextImpl context;
+ private final MessageProducer producer;
+
+ private final Map<String, Object> properties = new HashMap<>();
+
+ private volatile CompletionListener completionListener;
+
+ private Destination jmsHeaderReplyTo;
+ private String jmsHeaderCorrelationID;
+ private byte[] jmsHeaderCorrelationIDAsBytes;
+ private String jmsHeaderType;
+
+ JMSProducerImpl(final JMSContextImpl jmsContext, final MessageProducer innerProducer) {
+ this.context = jmsContext;
+ this.producer = innerProducer;
+ }
+
+ private <T> T getProperty(final String key, final Class<T> type) {
+ final Object val = properties.get(key);
+ if (val == null || type.isInstance(val)) {
+ return type.cast(val);
+ }
+ try {
+ return type.cast(PropertyEditors.getValue(type, val.toString()));
+ } catch (final PropertyEditorException pee) {
+ throw new MessageFormatRuntimeException(pee.getMessage());
+ }
+ }
+
+ @Override
+ public JMSProducer send(final Destination destination, final Message message) {
+ if (message == null) {
+ throw new MessageFormatRuntimeException("null message");
+ }
+
+ try {
+ if (jmsHeaderCorrelationID != null) {
+ message.setJMSCorrelationID(jmsHeaderCorrelationID);
+ }
+ if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
+ message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
+ }
+ if (jmsHeaderReplyTo != null) {
+ message.setJMSReplyTo(jmsHeaderReplyTo);
+ }
+ if (jmsHeaderType != null) {
+ message.setJMSType(jmsHeaderType);
+ }
+
+ setProperties(message);
+ if (completionListener != null) {
+ producer.send(destination, message, completionListener);
+ } else {
+ producer.send(destination, message);
+ }
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ return this;
+ }
+
+ private void setProperties(final Message message) throws JMSException {
+ for (final Map.Entry<String, Object> entry : properties.entrySet()) {
+ message.setObjectProperty(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public JMSProducer send(final Destination destination, final String body) {
+ send(destination, wrap(context.createTextMessage(body)));
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(final Destination destination, final Map<String, Object> body) {
+ final MapMessage message = wrap(context.createMapMessage());
+ if (body != null) {
+ try {
+ for (final Map.Entry<String, Object> entry : body.entrySet()) {
+ final String name = entry.getKey();
+ final Object v = entry.getValue();
+ if (v instanceof String) {
+ message.setString(name, (String) v);
+ } else if (v instanceof Long) {
+ message.setLong(name, (Long) v);
+ } else if (v instanceof Double) {
+ message.setDouble(name, (Double) v);
+ } else if (v instanceof Integer) {
+ message.setInt(name, (Integer) v);
+ } else if (v instanceof Character) {
+ message.setChar(name, (Character) v);
+ } else if (v instanceof Short) {
+ message.setShort(name, (Short) v);
+ } else if (v instanceof Boolean) {
+ message.setBoolean(name, (Boolean) v);
+ } else if (v instanceof Float) {
+ message.setFloat(name, (Float) v);
+ } else if (v instanceof Byte) {
+ message.setByte(name, (Byte) v);
+ } else if (v instanceof byte[]) {
+ byte[] array = (byte[]) v;
+ message.setBytes(name, array, 0, array.length);
+ } else {
+ message.setObject(name, v);
+ }
+ }
+ } catch (final JMSException e) {
+ throw new MessageFormatRuntimeException(e.getMessage());
+ }
+ }
+ send(destination, message);
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(final Destination destination, final byte[] body) {
+ final BytesMessage message = wrap(context.createBytesMessage());
+ if (body != null) {
+ try {
+ message.writeBytes(body);
+ } catch (final JMSException e) {
+ throw new MessageFormatRuntimeException(e.getMessage());
+ }
+ }
+ send(destination, message);
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(final Destination destination, final Serializable body) {
+ final ObjectMessage message = wrap(context.createObjectMessage(body));
+ send(destination, message);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setDisableMessageID(final boolean value) {
+ try {
+ producer.setDisableMessageID(value);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public boolean getDisableMessageID() {
+ try {
+ return producer.getDisableMessageID();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setDisableMessageTimestamp(final boolean value) {
+ try {
+ producer.setDisableMessageTimestamp(value);
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public boolean getDisableMessageTimestamp() {
+ try {
+ return producer.getDisableMessageTimestamp();
+ } catch (final JMSException e) {
+ throw toRuntimeException(e);
+ }
+ }
+
+ @Override
+ public JMSProducer setDeliveryMode(final int deliveryMode) {
+ try {
+ producer.setDeliveryMode(deliveryMode);
+ } catch (final JMSException e) {
+ final JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ return this;
+ }
+
+ @Override
+ public int getDeliveryMode() {
+ try {
+ return producer.getDeliveryMode();
+ } catch (final JMSException e) {
+ final JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+
+ @Override
+ public JMSProducer setPriority(final int priority) {
+ try {
+ producer.setPriority(priority);
+ } catch (final JMSException e) {
+ final JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ return this;
+ }
+
+ @Override
+ public int getPriority() {
+ try {
+ return producer.getPriority();
+ } catch (final JMSException e) {
+ final JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+
+ @Override
+ public JMSProducer setTimeToLive(final long timeToLive) {
+ try {
+ producer.setTimeToLive(timeToLive);
+ return this;
+ } catch (final JMSException e) {
+ final JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+
+ @Override
+ public long getTimeToLive() {
+ try {
+ return producer.getTimeToLive();
+ } catch (final JMSException e) {
+ final JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+
+ @Override
+ public JMSProducer setDeliveryDelay(final long deliveryDelay) {
+ try {
+ producer.setDeliveryDelay(deliveryDelay);
+ return this;
+ } catch (final JMSException e) {
+ JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+
+ @Override
+ public long getDeliveryDelay() {
+ try {
+ return producer.getDeliveryDelay();
+ } catch (final Exception ignored) {
+ // no-op
+ }
+ return 0;
+ }
+
+ @Override
+ public JMSProducer setAsync(final CompletionListener completionListener) {
+ this.completionListener = completionListener;
+ return this;
+ }
+
+ @Override
+ public CompletionListener getAsync() {
+ return completionListener;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final boolean value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final byte value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final short value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final int value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final long value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final float value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final double value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final String value) {
+ validName(name);
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public JMSProducer clearProperties() {
+ properties.clear();
+ return this;
+ }
+
+ @Override
+ public boolean propertyExists(final String name) {
+ return properties.containsKey(name);
+ }
+
+ @Override
+ public JMSProducer setProperty(final String name, final Object value) {
+ validName(name);
+ if (value != null && !Boolean.class.isInstance(value) && !Byte.class.isInstance(value) && !Character.class.isInstance(value)
+ && !Short.class.isInstance(value) && !Integer.class.isInstance(value) && !Long.class.isInstance(value)
+ && !Float.class.isInstance(value) && !Double.class.isInstance(value) && !String.class.isInstance(value)
+ && !byte[].class.isInstance(value)) {
+ throw new MessageFormatRuntimeException("Unsupported type: " + value);
+ }
+ properties.put(name, value);
+ return this;
+ }
+
+ @Override
+ public boolean getBooleanProperty(final String name) {
+ return getProperty(name, Boolean.class);
+ }
+
+ @Override
+ public byte getByteProperty(final String name) {
+ return getProperty(name, Byte.class);
+ }
+
+ @Override
+ public short getShortProperty(final String name) {
+ return getProperty(name, Short.class);
+ }
+
+ @Override
+ public int getIntProperty(final String name) {
+ return getProperty(name, Integer.class);
+ }
+
+ @Override
+ public long getLongProperty(final String name) {
+ return getProperty(name, Long.class);
+ }
+
+ @Override
+ public float getFloatProperty(final String name) {
+ return getProperty(name, Float.class);
+ }
+
+ @Override
+ public double getDoubleProperty(final String name) {
+ return getProperty(name, Double.class);
+ }
+
+ @Override
+ public String getStringProperty(final String name) {
+ return getProperty(name, String.class);
+ }
+
+ @Override
+ public Object getObjectProperty(final String name) {
+ return getProperty(name, Object.class);
+ }
+
+ @Override
+ public Set<String> getPropertyNames() {
+ return new HashSet<>(properties.keySet());
+ }
+
+ @Override
+ public JMSProducer setJMSCorrelationIDAsBytes(final byte[] correlationID) {
+ if (correlationID == null || correlationID.length == 0) {
+ throw new JMSRuntimeException("Please specify a non-zero length byte[]");
+ }
+ jmsHeaderCorrelationIDAsBytes = Arrays.copyOf(correlationID, correlationID.length);
+ return this;
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() {
+ return Arrays.copyOf(jmsHeaderCorrelationIDAsBytes, jmsHeaderCorrelationIDAsBytes.length);
+ }
+
+ @Override
+ public JMSProducer setJMSCorrelationID(final String correlationID) {
+ jmsHeaderCorrelationID = correlationID;
+ return this;
+ }
+
+ @Override
+ public String getJMSCorrelationID() {
+ return jmsHeaderCorrelationID;
+ }
+
+ @Override
+ public JMSProducer setJMSType(final String type) {
+ jmsHeaderType = type;
+ return this;
+ }
+
+ @Override
+ public String getJMSType() {
+ return jmsHeaderType;
+ }
+
+ @Override
+ public JMSProducer setJMSReplyTo(final Destination replyTo) {
+ jmsHeaderReplyTo = replyTo;
+ return this;
+ }
+
+ @Override
+ public Destination getJMSReplyTo() {
+ return jmsHeaderReplyTo;
+ }
+
+ private void validName(final String name) {
+ if (name == null || name.isEmpty()) {
+ throw new IllegalArgumentException("name can't be blank");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tomee/blob/1bf768b7/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
new file mode 100644
index 0000000..ca0116e
--- /dev/null
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+public class TomEEConnection extends ActiveMQConnection {
+ public TomEEConnection(final Transport transport, final IdGenerator clientIdGenerator,
+ final IdGenerator connectionIdGenerator, final JMSStatsImpl factoryStats) throws Exception {
+ super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
+ }
+
+ @Override
+ public Session createSession(final int sessionMode) throws JMSException {
+ return super.createSession(sessionMode == Session.SESSION_TRANSACTED, sessionMode);
+ }
+
+ @Override
+ public Session createSession() throws JMSException {
+ return createSession(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Override
+ public ConnectionConsumer createSharedDurableConnectionConsumer(final Topic topic, final String subscriptionName, final String messageSelector,
+ final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
+ throw new IllegalStateException("Not allowed in a RA");
+ }
+
+ @Override
+ public ConnectionConsumer createSharedConnectionConsumer(final Topic topic, final String subscriptionName, final String messageSelector,
+ final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
+ throw new IllegalStateException("Not allowed in a RA");
+ }
+}