You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by cs...@apache.org on 2017/08/03 14:35:14 UTC

[02/15] karaf git commit: [KARAF-5131] XA + JMS support

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java
deleted file mode 100644
index ba36c1c..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import org.apache.commons.pool2.KeyedPooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A JMS provider which pools Connection, Session and MessageProducer instances
- * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
- * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
- * Connections, sessions and producers are returned to a pool after use so that they can be reused later
- * without having to undergo the cost of creating them again.
- *
- * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
- * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
- * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
- * just created at startup and left active, handling incoming messages as they come. When a consumer is
- * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
- * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
- * where they'll get held until the consumer is active again.
- *
- * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
- * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
- * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
- * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
- *
- * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
- * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
- * be used when configuring this optional feature. Eviction runs contend with client threads for access
- * to objects in the pool, so if they run too frequently performance issues may result. The idle object
- * eviction thread may be configured using the {@link PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method.  By
- * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
- * configure the idle eviction thread to run.
- */
-public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
-
-    private final AtomicBoolean stopped = new AtomicBoolean(false);
-    private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
-
-    private final ConnectionFactory connectionFactory;
-
-    private int maximumActiveSessionPerConnection = 500;
-    private int idleTimeout = 30 * 1000;
-    private boolean blockIfSessionPoolIsFull = true;
-    private long blockIfSessionPoolIsFullTimeout = -1L;
-    private long expiryTimeout = 0L;
-    private boolean createConnectionOnStartup = true;
-    private boolean useAnonymousProducers = true;
-
-    public PooledConnectionFactory(ConnectionFactory connectionFactory) {
-        this.connectionFactory = Objects.requireNonNull(connectionFactory, "connectionFactory must be set");
-    }
-
-    public void initConnectionsPool() {
-        if (this.connectionsPool == null) {
-            this.connectionsPool = new GenericKeyedObjectPool<>(
-                    new KeyedPooledObjectFactory<ConnectionKey, ConnectionPool>() {
-                        @Override
-                        public void activateObject(ConnectionKey key, PooledObject<ConnectionPool> connection) throws Exception {
-                        }
-                        @Override
-                        public void destroyObject(ConnectionKey key, PooledObject<ConnectionPool> connection) throws Exception {
-                            try {
-                                if (LOG.isTraceEnabled()) {
-                                    LOG.trace("Destroying connection: {}", connection);
-                                }
-                                connection.getObject().close();
-                            } catch (Exception e) {
-                                LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
-                            }
-                        }
-                        @Override
-                        public PooledObject<ConnectionPool> makeObject(ConnectionKey key) throws Exception {
-                            Connection delegate = createConnection(key);
-
-                            ConnectionPool connection = createConnectionPool(delegate);
-                            connection.setIdleTimeout(getIdleTimeout());
-                            connection.setExpiryTimeout(getExpiryTimeout());
-                            connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
-                            connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
-                            if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
-                                connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
-                            }
-                            connection.setUseAnonymousProducers(isUseAnonymousProducers());
-
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Created new connection: {}", connection);
-                            }
-
-                            return new DefaultPooledObject<>(connection);
-                        }
-
-                        @Override
-                        public void passivateObject(ConnectionKey key, PooledObject<ConnectionPool> connection) throws Exception {
-                        }
-
-                        @Override
-                        public boolean validateObject(ConnectionKey key, PooledObject<ConnectionPool> connection) {
-                            if (connection != null && connection.getObject() != null && connection.getObject().expiredCheck()) {
-                                if (LOG.isTraceEnabled()) {
-                                    LOG.trace("Connection has expired: {} and will be destroyed", connection);
-                                }
-
-                                return false;
-                            }
-
-                            return true;
-                        }
-                    });
-
-            // Set max idle (not max active) since our connections always idle in the pool.
-            this.connectionsPool.setMaxIdlePerKey(1);
-
-            // We always want our validate method to control when idle objects are evicted.
-            this.connectionsPool.setTestOnBorrow(true);
-            this.connectionsPool.setTestWhileIdle(true);
-        }
-    }
-
-    /**
-     * @return the currently configured ConnectionFactory used to create the pooled Connections.
-     */
-    public ConnectionFactory getConnectionFactory() {
-        return connectionFactory;
-    }
-    @Override
-    public Connection createConnection() throws JMSException {
-        return createConnection(null, null);
-    }
-
-    @Override
-    public synchronized Connection createConnection(String userName, String password) throws JMSException {
-        if (stopped.get()) {
-            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
-            return null;
-        }
-
-        ConnectionPool connection = null;
-        ConnectionKey key = new ConnectionKey(userName, password);
-
-        // This will either return an existing non-expired ConnectionPool or it
-        // will create a new one to meet the demand.
-        if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
-            try {
-                // we want borrowObject to return the one we added.
-                connectionsPool.setLifo(true);
-                connectionsPool.addObject(key);
-            } catch (Exception e) {
-                throw createJmsException("Error while attempting to add new Connection to the pool", e);
-            }
-        } else {
-            // now we want the oldest one in the pool.
-            connectionsPool.setLifo(false);
-        }
-
-        try {
-
-            // We can race against other threads returning the connection when there is an
-            // expiration or idle timeout.  We keep pulling out ConnectionPool instances until
-            // we win and get a non-closed instance and then increment the reference count
-            // under lock to prevent another thread from triggering an expiration check and
-            // pulling the rug out from under us.
-            while (true) {
-                ConnectionPool cp = connectionsPool.borrowObject(key);
-                synchronized (cp) {
-                    if (cp.getConnection() != null) {
-                        cp.incrementReferenceCount();
-                        connection = cp;
-                        break;
-                    }
-
-                    // Return the bad one to the pool and let if get destroyed as normal.
-                    connectionsPool.returnObject(key, cp);
-                }
-            }
-        } catch (Exception e) {
-            throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
-        }
-
-        try {
-            connectionsPool.returnObject(key, connection);
-        } catch (Exception e) {
-            throw createJmsException("Error when returning connection to the pool", e);
-        }
-
-        return newPooledConnection(connection);
-    }
-
-    protected Connection newPooledConnection(ConnectionPool connection) {
-        return new PooledConnection(connection);
-    }
-
-    private JMSException createJmsException(String msg, Exception cause) {
-        JMSException exception = new JMSException(msg);
-        exception.setLinkedException(cause);
-        exception.initCause(cause);
-        return exception;
-    }
-
-    protected Connection createConnection(ConnectionKey key) throws JMSException {
-        if (key.getUserName() == null && key.getPassword() == null) {
-            return connectionFactory.createConnection();
-        } else {
-            return connectionFactory.createConnection(key.getUserName(), key.getPassword());
-        }
-    }
-
-    public void start() {
-        LOG.debug("Starting the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
-        stopped.set(false);
-        if (isCreateConnectionOnStartup()) {
-            try {
-                // warm the pool by creating a connection during startup
-                createConnection();
-            } catch (JMSException e) {
-                LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
-            }
-        }
-    }
-
-    public void stop() {
-        if (stopped.compareAndSet(false, true)) {
-            LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
-                    connectionsPool != null ? connectionsPool.getNumActive() : 0);
-            try {
-                if (connectionsPool != null) {
-                    connectionsPool.close();
-                }
-            } catch (Exception e) {
-            }
-        }
-    }
-
-    /**
-     * Clears all connections from the pool.  Each connection that is currently in the pool is
-     * closed and removed from the pool.  A new connection will be created on the next call to
-     * {@link #createConnection()}.  Care should be taken when using this method as Connections that
-     * are in use be client's will be closed.
-     */
-    public void clear() {
-
-        if (stopped.get()) {
-            return;
-        }
-
-        getConnectionsPool().clear();
-    }
-
-    /**
-     * Returns the currently configured maximum number of sessions a pooled Connection will
-     * create before it either blocks or throws an exception when a new session is requested,
-     * depending on configuration.
-     *
-     * @return the number of session instances that can be taken from a pooled connection.
-     */
-    public int getMaximumActiveSessionPerConnection() {
-        return maximumActiveSessionPerConnection;
-    }
-
-    /**
-     * Sets the maximum number of active sessions per connection
-     *
-     * @param maximumActiveSessionPerConnection
-     *      The maximum number of active session per connection in the pool.
-     */
-    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
-        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
-    }
-
-    /**
-     * Controls the behavior of the internal session pool. By default the call to
-     * Connection.getSession() will block if the session pool is full.  If the
-     * argument false is given, it will change the default behavior and instead the
-     * call to getSession() will throw a JMSException.
-     *
-     * The size of the session pool is controlled by the @see #maximumActive
-     * property.
-     *
-     * @param block - if true, the call to getSession() blocks if the pool is full
-     * until a session object is available.  defaults to true.
-     */
-    public void setBlockIfSessionPoolIsFull(boolean block) {
-        this.blockIfSessionPoolIsFull = block;
-    }
-
-    /**
-     * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
-     * once the maximum number of sessions has been borrowed from the the Session Pool.
-     *
-     * @return true if the pooled Connection createSession method will block when the limit is hit.
-     * @see #setBlockIfSessionPoolIsFull(boolean)
-     */
-    public boolean isBlockIfSessionPoolIsFull() {
-        return this.blockIfSessionPoolIsFull;
-    }
-
-    /**
-     * Returns the maximum number to pooled Connections that this factory will allow before it
-     * begins to return connections from the pool on calls to ({@link #createConnection()}.
-     *
-     * @return the maxConnections that will be created for this pool.
-     */
-    public int getMaxConnections() {
-        return getConnectionsPool().getMaxIdlePerKey();
-    }
-
-    /**
-     * Sets the maximum number of pooled Connections (defaults to one).  Each call to
-     * {@link #createConnection()} will result in a new Connection being create up to the max
-     * connections value.
-     *
-     * @param maxConnections the maxConnections to set
-     */
-    public void setMaxConnections(int maxConnections) {
-        getConnectionsPool().setMaxIdlePerKey(maxConnections);
-    }
-
-    /**
-     * Gets the Idle timeout value applied to new Connection's that are created by this pool.
-     * <p/>
-     * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
-     * and if so is closed and removed from the pool.  The default value is 30 seconds.
-     *
-     * @return idle timeout value (milliseconds)
-     */
-    public int getIdleTimeout() {
-        return idleTimeout;
-    }
-
-    /**
-     * Sets the idle timeout  value for Connection's that are created by this pool in Milliseconds,
-     * defaults to 30 seconds.
-     * <p/>
-     * For a Connection that is in the pool but has no current users the idle timeout determines how
-     * long the Connection can live before it is eligible for removal from the pool.  Normally the
-     * connections are tested when an attempt to check one out occurs so a Connection instance can sit
-     * in the pool much longer than its idle timeout if connections are used infrequently.
-     *
-     * @param idleTimeout
-     *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
-     */
-    public void setIdleTimeout(int idleTimeout) {
-        this.idleTimeout = idleTimeout;
-    }
-
-    /**
-     * allow connections to expire, irrespective of load or idle time. This is useful with failover
-     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
-     *
-     * @param expiryTimeout non zero in milliseconds
-     */
-    public void setExpiryTimeout(long expiryTimeout) {
-        this.expiryTimeout = expiryTimeout;
-    }
-
-    /**
-     * @return the configured expiration timeout for connections in the pool.
-     */
-    public long getExpiryTimeout() {
-        return expiryTimeout;
-    }
-
-    /**
-     * @return true if a Connection is created immediately on a call to {@link #start()}.
-     */
-    public boolean isCreateConnectionOnStartup() {
-        return createConnectionOnStartup;
-    }
-
-    /**
-     * Whether to create a connection on starting this {@link PooledConnectionFactory}.
-     * <p/>
-     * This can be used to warm-up the pool on startup. Notice that any kind of exception
-     * happens during startup is logged at WARN level and ignored.
-     *
-     * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
-     */
-    public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
-        this.createConnectionOnStartup = createConnectionOnStartup;
-    }
-
-    /**
-     * Should Sessions use one anonymous producer for all producer requests or should a new
-     * MessageProducer be created for each request to create a producer object, default is true.
-     *
-     * When enabled the session only needs to allocate one MessageProducer for all requests and
-     * the MessageProducer#send(destination, message) method can be used.  Normally this is the
-     * right thing to do however it does result in the Broker not showing the producers per
-     * destination.
-     *
-     * @return true if a PooledSession will use only a single anonymous message producer instance.
-     */
-    public boolean isUseAnonymousProducers() {
-        return this.useAnonymousProducers;
-    }
-
-    /**
-     * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates
-     * a new MessageProducer for each call the create a MessageProducer.
-     *
-     * @param value
-     *      Boolean value that configures whether anonymous producers are used.
-     */
-    public void setUseAnonymousProducers(boolean value) {
-        this.useAnonymousProducers = value;
-    }
-
-    /**
-     * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
-     *
-     * @return this factories pool of ConnectionPool instances.
-     */
-    protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
-        initConnectionsPool();
-        return this.connectionsPool;
-    }
-
-    /**
-     * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
-     * When non-positive, no idle object eviction thread will be run, and Connections will only be
-     * checked on borrow to determine if they have sat idle for too long or have failed for some
-     * other reason.
-     * <p/>
-     * By default this value is set to -1 and no expiration thread ever runs.
-     *
-     * @param timeBetweenExpirationCheckMillis
-     *      The time to wait between runs of the idle Connection eviction thread.
-     */
-    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
-        getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
-    }
-
-    /**
-     * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
-     */
-    public long getTimeBetweenExpirationCheckMillis() {
-        return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
-    }
-
-    /**
-     * @return the number of Connections currently in the Pool
-     */
-    public int getNumConnections() {
-        return getConnectionsPool().getNumIdle();
-    }
-
-    /**
-     * Delegate that creates each instance of an ConnectionPool object.  Subclasses can override
-     * this method to customize the type of connection pool returned.
-     *
-     * @param connection
-     *
-     * @return instance of a new ConnectionPool.
-     */
-    protected ConnectionPool createConnectionPool(Connection connection) {
-        return new ConnectionPool(connection);
-    }
-
-    /**
-     * Returns the timeout to use for blocking creating new sessions
-     *
-     * @return true if the pooled Connection createSession method will block when the limit is hit.
-     * @see #setBlockIfSessionPoolIsFull(boolean)
-     */
-    public long getBlockIfSessionPoolIsFullTimeout() {
-        return blockIfSessionPoolIsFullTimeout;
-    }
-
-    /**
-     * Controls the behavior of the internal session pool. By default the call to
-     * Connection.getSession() will block if the session pool is full.  This setting
-     * will affect how long it blocks and throws an exception after the timeout.
-     *
-     * The size of the session pool is controlled by the @see #maximumActive
-     * property.
-     *
-     * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
-     * property
-     *
-     * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
-     *                                        then use this setting to configure how long to block before retry
-     */
-    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
-        this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
-    }
-
-    @Override
-    public QueueConnection createQueueConnection() throws JMSException {
-        return (QueueConnection) createConnection();
-    }
-
-    @Override
-    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
-        return (QueueConnection) createConnection(userName, password);
-    }
-
-    @Override
-    public TopicConnection createTopicConnection() throws JMSException {
-        return (TopicConnection) createConnection();
-    }
-
-    @Override
-    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
-        return (TopicConnection) createConnection(userName, password);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java
deleted file mode 100644
index 9986513..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
-/**
- * A {@link MessageConsumer} which was created by {@link PooledSession}.
- */
-public class PooledMessageConsumer implements MessageConsumer {
-
-    private final PooledSession session;
-    private final MessageConsumer delegate;
-
-    /**
-     * Wraps the message consumer.
-     *
-     * @param session  the pooled session
-     * @param delegate the created consumer to wrap
-     */
-    public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) {
-        this.session = session;
-        this.delegate = delegate;
-    }
-
-    public void close() throws JMSException {
-        // ensure session removes consumer as its closed now
-        session.onConsumerClose(delegate);
-        delegate.close();
-    }
-
-    public MessageListener getMessageListener() throws JMSException {
-        return delegate.getMessageListener();
-    }
-
-    public String getMessageSelector() throws JMSException {
-        return delegate.getMessageSelector();
-    }
-
-    public Message receive() throws JMSException {
-        return delegate.receive();
-    }
-
-    public Message receive(long timeout) throws JMSException {
-        return delegate.receive(timeout);
-    }
-
-    public Message receiveNoWait() throws JMSException {
-        return delegate.receiveNoWait();
-    }
-
-    public void setMessageListener(MessageListener listener) throws JMSException {
-        delegate.setMessageListener(listener);
-    }
-
-    public String toString() {
-        return delegate.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java
deleted file mode 100644
index 53fdeba..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-
-/**
- * A pooled {@link MessageProducer}
- */
-public class PooledProducer implements MessageProducer {
-
-    private final MessageProducer messageProducer;
-    private final Destination destination;
-
-    private int deliveryMode;
-    private boolean disableMessageID;
-    private boolean disableMessageTimestamp;
-    private int priority;
-    private long timeToLive;
-    private boolean anonymous = true;
-
-    public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
-        this.messageProducer = messageProducer;
-        this.destination = destination;
-        this.anonymous = messageProducer.getDestination() == null;
-
-        this.deliveryMode = messageProducer.getDeliveryMode();
-        this.disableMessageID = messageProducer.getDisableMessageID();
-        this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp();
-        this.priority = messageProducer.getPriority();
-        this.timeToLive = messageProducer.getTimeToLive();
-    }
-
-    @Override
-    public void close() throws JMSException {
-        if (!anonymous) {
-            this.messageProducer.close();
-        }
-    }
-
-    @Override
-    public void send(Destination destination, Message message) throws JMSException {
-        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    @Override
-    public void send(Message message) throws JMSException {
-        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    @Override
-    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
-        send(destination, message, deliveryMode, priority, timeToLive);
-    }
-
-    @Override
-    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
-
-        if (destination == null) {
-            if (messageProducer.getDestination() == null) {
-                throw new UnsupportedOperationException("A destination must be specified.");
-            }
-            throw new InvalidDestinationException("Don't understand null destinations");
-        }
-
-        MessageProducer messageProducer = getMessageProducer();
-
-        // just in case let only one thread send at once
-        synchronized (messageProducer) {
-
-            if (anonymous && this.destination != null && !this.destination.equals(destination)) {
-                throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination);
-            }
-
-            // Producer will do it's own Destination validation so always use the destination
-            // based send method otherwise we might violate a JMS rule.
-            messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
-        }
-    }
-
-    @Override
-    public Destination getDestination() {
-        return destination;
-    }
-
-    @Override
-    public int getDeliveryMode() {
-        return deliveryMode;
-    }
-
-    @Override
-    public void setDeliveryMode(int deliveryMode) {
-        this.deliveryMode = deliveryMode;
-    }
-
-    @Override
-    public boolean getDisableMessageID() {
-        return disableMessageID;
-    }
-
-    @Override
-    public void setDisableMessageID(boolean disableMessageID) {
-        this.disableMessageID = disableMessageID;
-    }
-
-    @Override
-    public boolean getDisableMessageTimestamp() {
-        return disableMessageTimestamp;
-    }
-
-    @Override
-    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
-        this.disableMessageTimestamp = disableMessageTimestamp;
-    }
-
-    @Override
-    public int getPriority() {
-        return priority;
-    }
-
-    @Override
-    public void setPriority(int priority) {
-        this.priority = priority;
-    }
-
-    @Override
-    public long getTimeToLive() {
-        return timeToLive;
-    }
-
-    @Override
-    public void setTimeToLive(long timeToLive) {
-        this.timeToLive = timeToLive;
-    }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected MessageProducer getMessageProducer() {
-        return messageProducer;
-    }
-
-    protected boolean isAnonymous() {
-        return anonymous;
-    }
-
-    @Override
-    public String toString() {
-        return "PooledProducer { " + messageProducer + " }";
-    }
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java
deleted file mode 100644
index f0acca2..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
-
-/**
- * 
- */
-public class PooledQueueSender extends PooledProducer implements QueueSender {
-
-    public PooledQueueSender(QueueSender messageProducer, Destination destination) throws JMSException {
-        super(messageProducer, destination);
-    }
-
-    public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException {
-        getQueueSender().send(queue, message, i, i1, l);
-    }
-
-    public void send(Queue queue, Message message) throws JMSException {
-        getQueueSender().send(queue, message);
-    }
-
-    public Queue getQueue() throws JMSException {
-        return getQueueSender().getQueue();
-    }
-
-
-    protected QueueSender getQueueSender() {
-        return (QueueSender) getMessageProducer();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java
deleted file mode 100644
index 941732e..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import org.apache.commons.pool2.KeyedObjectPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-import java.io.Serializable;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class PooledSession implements Session, TopicSession, QueueSession, XASession {
-    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
-
-    private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
-    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<>();
-    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<>();
-
-    private MessageProducer producer;
-    private TopicPublisher publisher;
-    private QueueSender sender;
-
-    private Session session;
-    private boolean transactional = true;
-    private boolean ignoreClose;
-    private boolean isXa;
-    private boolean useAnonymousProducers = true;
-
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
-        this.key = key;
-        this.session = session;
-        this.sessionPool = sessionPool;
-        this.transactional = transactional;
-        this.useAnonymousProducers = anonymous;
-    }
-
-    public void addSessionEventListener(PooledSessionEventListener listener) {
-        // only add if really needed
-        if (!sessionEventListeners.contains(listener)) {
-            this.sessionEventListeners.add(listener);
-        }
-    }
-
-    protected boolean isIgnoreClose() {
-        return ignoreClose;
-    }
-
-    protected void setIgnoreClose(boolean ignoreClose) {
-        this.ignoreClose = ignoreClose;
-    }
-
-    @Override
-    public void close() throws JMSException {
-        if (!ignoreClose) {
-            boolean invalidate = false;
-            try {
-                // lets reset the session
-                getInternalSession().setMessageListener(null);
-
-                // Close any consumers and browsers that may have been created.
-                for (MessageConsumer consumer : consumers) {
-                    consumer.close();
-                }
-
-                for (QueueBrowser browser : browsers) {
-                    browser.close();
-                }
-
-                if (transactional && !isXa) {
-                    try {
-                        getInternalSession().rollback();
-                    } catch (JMSException e) {
-                        invalidate = true;
-                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
-                    }
-                }
-            } catch (JMSException ex) {
-                invalidate = true;
-                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
-            } finally {
-                consumers.clear();
-                browsers.clear();
-                for (PooledSessionEventListener listener : this.sessionEventListeners) {
-                    listener.onSessionClosed(this);
-                }
-                sessionEventListeners.clear();
-            }
-
-            if (invalidate) {
-                // lets close the session and not put the session back into the pool
-                // instead invalidate it so the pool can create a new one on demand.
-                if (session != null) {
-                    try {
-                        session.close();
-                    } catch (JMSException e1) {
-                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
-                    }
-                    session = null;
-                }
-                try {
-                    sessionPool.invalidateObject(key, this);
-                } catch (Exception e) {
-                    LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
-                }
-            } else {
-                try {
-                    sessionPool.returnObject(key, this);
-                } catch (Exception e) {
-                    javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
-                    illegalStateException.initCause(e);
-                    throw illegalStateException;
-                }
-            }
-        }
-    }
-
-    @Override
-    public void commit() throws JMSException {
-        getInternalSession().commit();
-    }
-
-    @Override
-    public BytesMessage createBytesMessage() throws JMSException {
-        return getInternalSession().createBytesMessage();
-    }
-
-    @Override
-    public MapMessage createMapMessage() throws JMSException {
-        return getInternalSession().createMapMessage();
-    }
-
-    @Override
-    public Message createMessage() throws JMSException {
-        return getInternalSession().createMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage() throws JMSException {
-        return getInternalSession().createObjectMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
-        return getInternalSession().createObjectMessage(serializable);
-    }
-
-    @Override
-    public Queue createQueue(String s) throws JMSException {
-        return getInternalSession().createQueue(s);
-    }
-
-    @Override
-    public StreamMessage createStreamMessage() throws JMSException {
-        return getInternalSession().createStreamMessage();
-    }
-
-    @Override
-    public TemporaryQueue createTemporaryQueue() throws JMSException {
-        TemporaryQueue result;
-
-        result = getInternalSession().createTemporaryQueue();
-
-        // Notify all of the listeners of the created temporary Queue.
-        for (PooledSessionEventListener listener : this.sessionEventListeners) {
-            listener.onTemporaryQueueCreate(result);
-        }
-
-        return result;
-    }
-
-    @Override
-    public TemporaryTopic createTemporaryTopic() throws JMSException {
-        TemporaryTopic result;
-
-        result = getInternalSession().createTemporaryTopic();
-
-        // Notify all of the listeners of the created temporary Topic.
-        for (PooledSessionEventListener listener : this.sessionEventListeners) {
-            listener.onTemporaryTopicCreate(result);
-        }
-
-        return result;
-    }
-
-    @Override
-    public void unsubscribe(String s) throws JMSException {
-        getInternalSession().unsubscribe(s);
-    }
-
-    @Override
-    public TextMessage createTextMessage() throws JMSException {
-        return getInternalSession().createTextMessage();
-    }
-
-    @Override
-    public TextMessage createTextMessage(String s) throws JMSException {
-        return getInternalSession().createTextMessage(s);
-    }
-
-    @Override
-    public Topic createTopic(String s) throws JMSException {
-        return getInternalSession().createTopic(s);
-    }
-
-    @Override
-    public int getAcknowledgeMode() throws JMSException {
-        return getInternalSession().getAcknowledgeMode();
-    }
-
-    @Override
-    public boolean getTransacted() throws JMSException {
-        return getInternalSession().getTransacted();
-    }
-
-    @Override
-    public void recover() throws JMSException {
-        getInternalSession().recover();
-    }
-
-    @Override
-    public void rollback() throws JMSException {
-        getInternalSession().rollback();
-    }
-
-    @Override
-    public XAResource getXAResource() {
-        if (session instanceof XASession) {
-            return ((XASession) session).getXAResource();
-        }
-        return null;
-    }
-
-    @Override
-    public Session getSession() {
-        return this;
-    }
-
-    @Override
-    public void run() {
-        if (session != null) {
-            session.run();
-        }
-    }
-
-    // Consumer related methods
-    // -------------------------------------------------------------------------
-    @Override
-    public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        return addQueueBrowser(getInternalSession().createBrowser(queue));
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
-        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination) throws JMSException {
-        return addConsumer(getInternalSession().createConsumer(destination));
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
-        return addConsumer(getInternalSession().createConsumer(destination, selector));
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
-        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
-        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
-        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        return getInternalSession().getMessageListener();
-    }
-
-    @Override
-    public void setMessageListener(MessageListener messageListener) throws JMSException {
-        getInternalSession().setMessageListener(messageListener);
-    }
-
-    @Override
-    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
-        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
-    }
-
-    @Override
-    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
-        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
-    }
-
-    @Override
-    public QueueReceiver createReceiver(Queue queue) throws JMSException {
-        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
-    }
-
-    @Override
-    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
-        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
-    }
-
-    // Producer related methods
-    // -------------------------------------------------------------------------
-    @Override
-    public MessageProducer createProducer(Destination destination) throws JMSException {
-        return new PooledProducer(getMessageProducer(destination), destination);
-    }
-
-    @Override
-    public QueueSender createSender(Queue queue) throws JMSException {
-        return new PooledQueueSender(getQueueSender(queue), queue);
-    }
-
-    @Override
-    public TopicPublisher createPublisher(Topic topic) throws JMSException {
-        return new PooledTopicPublisher(getTopicPublisher(topic), topic);
-    }
-
-    public Session getInternalSession() throws IllegalStateException {
-        if (session == null) {
-            throw new IllegalStateException("The session has already been closed");
-        }
-        return session;
-    }
-
-    public MessageProducer getMessageProducer() throws JMSException {
-        return getMessageProducer(null);
-    }
-
-    public MessageProducer getMessageProducer(Destination destination) throws JMSException {
-        MessageProducer result = null;
-
-        if (useAnonymousProducers) {
-            if (producer == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (producer == null) {
-                        producer = getInternalSession().createProducer(null);
-                    }
-                }
-            }
-
-            result = producer;
-        } else {
-            result = getInternalSession().createProducer(destination);
-        }
-
-        return result;
-    }
-
-    public QueueSender getQueueSender() throws JMSException {
-        return getQueueSender(null);
-    }
-
-    public QueueSender getQueueSender(Queue destination) throws JMSException {
-        QueueSender result = null;
-
-        if (useAnonymousProducers) {
-            if (sender == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (sender == null) {
-                        sender = ((QueueSession) getInternalSession()).createSender(null);
-                    }
-                }
-            }
-
-            result = sender;
-        } else {
-            result = ((QueueSession) getInternalSession()).createSender(destination);
-        }
-
-        return result;
-    }
-
-    public TopicPublisher getTopicPublisher() throws JMSException {
-        return getTopicPublisher(null);
-    }
-
-    public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
-        TopicPublisher result = null;
-
-        if (useAnonymousProducers) {
-            if (publisher == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (publisher == null) {
-                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
-                    }
-                }
-            }
-
-            result = publisher;
-        } else {
-            result = ((TopicSession) getInternalSession()).createPublisher(destination);
-        }
-
-        return result;
-    }
-
-    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
-        browsers.add(browser);
-        return browser;
-    }
-
-    private MessageConsumer addConsumer(MessageConsumer consumer) {
-        consumers.add(consumer);
-        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
-        // method is invoked when the returned consumer is closed, to avoid memory
-        // leak in this session class in case many consumers is created
-        return new PooledMessageConsumer(this, consumer);
-    }
-
-    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
-        consumers.add(subscriber);
-        return subscriber;
-    }
-
-    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
-        consumers.add(receiver);
-        return receiver;
-    }
-
-    public void setIsXa(boolean isXa) {
-        this.isXa = isXa;
-    }
-
-    @Override
-    public String toString() {
-        return "PooledSession { " + session + " }";
-    }
-
-    /**
-     * Callback invoked when the consumer is closed.
-     * <p/>
-     * This is used to keep track of an explicit closed consumer created by this
-     * session, by which we know do not need to keep track of the consumer, as
-     * its already closed.
-     *
-     * @param consumer
-     *            the consumer which is being closed
-     */
-    protected void onConsumerClose(MessageConsumer consumer) {
-        consumers.remove(consumer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java
deleted file mode 100644
index b9a5dd3..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-
-interface PooledSessionEventListener {
-
-    /**
-     * Called on successful creation of a new TemporaryQueue.
-     *
-     * @param tempQueue
-     *      The TemporaryQueue just created.
-     */
-    void onTemporaryQueueCreate(TemporaryQueue tempQueue);
-
-    /**
-     * Called on successful creation of a new TemporaryTopic.
-     *
-     * @param tempTopic
-     *      The TemporaryTopic just created.
-     */
-    void onTemporaryTopicCreate(TemporaryTopic tempTopic);
-
-    /**
-     * Called when the PooledSession is closed.
-     *
-     * @param session
-     *      The PooledSession that has been closed.
-     */
-    void onSessionClosed(PooledSession session);
-
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java
deleted file mode 100644
index b466151..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-
-/**
- * 
- */
-public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
-
-    public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException {
-        super(messageProducer, destination);
-    }
-
-    public Topic getTopic() throws JMSException {
-        return getTopicPublisher().getTopic();
-    }
-
-    public void publish(Message message) throws JMSException {
-        getTopicPublisher().publish((Topic) getDestination(), message);
-    }
-
-    public void publish(Message message, int i, int i1, long l) throws JMSException {
-        getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
-    }
-
-    public void publish(Topic topic, Message message) throws JMSException {
-        getTopicPublisher().publish(topic, message);
-    }
-
-    public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
-        getTopicPublisher().publish(topic, message, i, i1, l);
-    }
-
-    protected TopicPublisher getTopicPublisher() {
-        return (TopicPublisher) getMessageProducer();
-    }
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java
deleted file mode 100644
index 389438a..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal;
-
-/**
- * A cache key for the session details
- *
- * 
- */
-public class SessionKey {
-
-    private boolean transacted;
-    private int ackMode;
-
-    private int hash;
-
-    public SessionKey(boolean transacted, int ackMode) {
-        this.transacted = transacted;
-        this.ackMode = ackMode;
-        hash = ackMode;
-        if (transacted) {
-            hash = 31 * hash + 1;
-        }
-    }
-
-    public int hashCode() {
-        return hash;
-    }
-
-    public boolean equals(Object that) {
-        if (this == that) {
-            return true;
-        }
-        if (that instanceof SessionKey) {
-            return equals((SessionKey) that);
-        }
-        return false;
-    }
-
-    public boolean equals(SessionKey that) {
-        return this.transacted == that.transacted && this.ackMode == that.ackMode;
-    }
-
-    public boolean isTransacted() {
-        return transacted;
-    }
-
-    public int getAckMode() {
-        return ackMode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java
----------------------------------------------------------------------
diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java
deleted file mode 100644
index dfecd1c..0000000
--- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.jms.pool.internal.osgi;
-
-import org.apache.karaf.jms.pool.internal.PooledConnectionFactory;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceReference;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.ConnectionFactory;
-import java.util.Hashtable;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-public class Activator implements BundleActivator, ServiceTrackerCustomizer<ConnectionFactory, Activator.ConnectionFactoryData> {
-
-    public static final String PROP_PREFIX = "karaf.jms.";
-
-    public static final String PROP_OPT_IN = PROP_PREFIX + "wrap";
-
-    public static final String PROP_POOL = PROP_PREFIX + "pool.";
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(Activator.class);
-
-    private BundleContext context;
-    private ServiceTracker<ConnectionFactory, ConnectionFactoryData> cfTracker;
-
-    @Override
-    public void start(BundleContext context) throws Exception {
-        this.context = context;
-        cfTracker = new ServiceTracker<>(
-                context,
-                context.createFilter("(&(objectClass=javax.jms.ConnectionFactory)(" + PROP_OPT_IN + "=*))"),
-                this);
-        cfTracker.open();
-    }
-
-    @Override
-    public void stop(BundleContext context) throws Exception {
-        cfTracker.close();
-    }
-
-    @Override
-    public ConnectionFactoryData addingService(ServiceReference<ConnectionFactory> reference) {
-        ConnectionFactoryData data = new ConnectionFactoryData(context, reference);
-        try {
-            data.init();
-            return data;
-        } catch (Throwable t) {
-            LOG.warn("Error creating pooled JMS ConnectionFactory", t);
-            data.destroy();
-            return null;
-        }
-    }
-
-    @Override
-    public void modifiedService(ServiceReference<ConnectionFactory> reference, ConnectionFactoryData service) {
-    }
-
-    @Override
-    public void removedService(ServiceReference<ConnectionFactory> reference, ConnectionFactoryData service) {
-        service.destroy();
-    }
-
-    class ConnectionFactoryData {
-
-        private final BundleContext context;
-        private final ServiceReference<ConnectionFactory> reference;
-        private ConnectionFactory connectionFactory;
-        private PooledConnectionFactory pooledConnectionFactory;
-        private ServiceRegistration<ConnectionFactory> registration;
-
-        ConnectionFactoryData(BundleContext context, ServiceReference<ConnectionFactory> reference) {
-            this.context = context;
-            this.reference = reference;
-        }
-
-        void init() throws Exception {
-            connectionFactory = context.getService(reference);
-            PooledConnectionFactory pcf = new PooledConnectionFactory(connectionFactory);
-            populate(pcf);
-            register(pcf);
-        }
-
-        void destroy() {
-            unregister();
-            if (connectionFactory != null) {
-                try {
-                    context.ungetService(reference);
-                } catch (Exception e) {
-                    // Ignore
-                } finally {
-                    connectionFactory = null;
-                }
-            }
-        }
-
-        void populate(PooledConnectionFactory pcf) {
-            setObject(PROP_POOL + "maxConnections", Integer::parseInt, pcf::setMaxConnections);
-            setObject(PROP_POOL + "maximumActiveSessionPerConnection", Integer::parseInt, pcf::setMaximumActiveSessionPerConnection);
-            setObject(PROP_POOL + "idleTimeout", Integer::parseInt, pcf::setIdleTimeout);
-            setObject(PROP_POOL + "blockIfSessionPoolIsFull", Boolean::parseBoolean, pcf::setBlockIfSessionPoolIsFull);
-            setObject(PROP_POOL + "blockIfSessionPoolIsFullTimeout", Long::parseLong, pcf::setBlockIfSessionPoolIsFullTimeout);
-            setObject(PROP_POOL + "expiryTimeout", Long::parseLong, pcf::setExpiryTimeout);
-            setObject(PROP_POOL + "createConnectionOnStartup", Boolean::parseBoolean, pcf::setCreateConnectionOnStartup);
-            setObject(PROP_POOL + "useAnonymousProducers", Boolean::parseBoolean, pcf::setUseAnonymousProducers);
-        }
-
-        <T> void setObject(String name, Function<String, T> parser, Consumer<T> setter) {
-            Object o = reference.getProperty(name);
-            if (o != null) {
-                setter.accept(parser.apply(o.toString()));
-            }
-        }
-
-        void register(PooledConnectionFactory pcf) {
-            this.pooledConnectionFactory = pcf;
-            Hashtable<String, Object> props = new Hashtable<>();
-            int ranking = 0;
-            for (String key : reference.getPropertyKeys()) {
-                Object value = reference.getProperty(key);
-                if (Constants.SERVICE_RANKING.equals(key)) {
-                    if (value instanceof Integer) {
-                        ranking = (Integer) value;
-                    }
-                } else if (!key.startsWith("service.")
-                        && !key.startsWith(PROP_PREFIX)) {
-                    props.put(key, value);
-                }
-            }
-            props.put(Constants.SERVICE_RANKING, ranking + 1);
-            pcf.start();
-            BundleContext context = reference.getBundle().getBundleContext();
-            registration = context.registerService(ConnectionFactory.class, pooledConnectionFactory, props);
-        }
-
-        void unregister() {
-            if (registration != null) {
-                try {
-                    registration.unregister();
-                } catch (Exception e) {
-                    // Ignore
-                } finally {
-                    registration = null;
-                }
-            }
-            if (pooledConnectionFactory != null) {
-                try {
-                    pooledConnectionFactory.stop();
-                } catch (Exception e) {
-                    // Ignore
-                } finally {
-                    pooledConnectionFactory = null;
-                }
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java
----------------------------------------------------------------------
diff --git a/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java b/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java
new file mode 100644
index 0000000..d62a9ba
--- /dev/null
+++ b/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.jms;
+
+import javax.management.MBeanException;
+import javax.management.openmbean.TabularData;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JMS MBean.
+ */
+public interface JmsMBean {
+
+    /**
+     * List the JMS connection factories.
+     *
+     * @return The {@link List} of the JMS connection factories name.
+     * @throws MBeanException If the MBean fails.
+     */
+    List<String> getConnectionfactories() throws MBeanException;
+
+    /**
+     * Create a JMS connection factory.
+     *
+     * @param name The JMS connection factory name.
+     * @param type The JMS connection factory type (ActiveMQ or WebsphereMQ).
+     * @param url The JMS connection factory URL. NB: when type is WebsphereMQ, the URL has the format host/port/queuemanager/channel.
+     * @throws MBeanException If the MBean fails.
+     */
+    void create(String name, String type, String url) throws MBeanException;
+
+    /**
+     * Create a JMS connection factory.
+     *
+     * @param name The JMS connection factory name.
+     * @param type The JMS connection factory type (ActiveMQ or WebsphereMQ).
+     * @param url The JMS connection factory URL. NB: when type is WebsphereMQ, the URL has the format host/port/queuemanager/channel.
+     * @param username The JMS connection factory authentication username.
+     * @param password The JMS connection factory authentication password.
+     * @throws MBeanException If the MBean fails.
+     */
+    void create(String name, String type, String url, String username, String password) throws MBeanException;
+
+    /**
+     * Delete a JMS connection factory.
+     *
+     * @param name The JMS connection factory name.
+     * @throws MBeanException If the MBean fails.
+     */
+    void delete(String name) throws MBeanException;
+
+    /**
+     * Get details about a JMS connection factory.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return A {@link Map} (property/value) containing details.
+     * @throws MBeanException If the MBean fails.
+     */
+    Map<String, String> info(String connectionFactory, String username, String password) throws MBeanException;
+
+    /**
+     * Count the messages on a given JMS queue.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param queue The JMS queue name.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return The number of messages in the queue.
+     * @throws MBeanException If the MBean fails.
+     */
+    int count(String connectionFactory, String queue, String username, String password) throws MBeanException;
+
+    /**
+     * List the JMS queues.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return The {@link List} of JMS queues.
+     * @throws MBeanException If the MBean fails.
+     */
+    List<String> queues(String connectionFactory, String username, String password) throws MBeanException;
+
+    /**
+     * List the JMS topics.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return The @link List} of JMS topics.
+     * @throws MBeanException If the MBean fails.
+     */
+    List<String> topics(String connectionFactory, String username, String password) throws MBeanException;
+
+    /**
+     * Browse the messages in a JMS queue.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param queue The JMS queue name.
+     * @param selector A selector to use to browse only certain messages.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return A {@link TabularData} containing messages details.
+     * @throws MBeanException If the MBean fails.
+     */
+    TabularData browse(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException;
+
+    /**
+     * Send a JMS message to given queue.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param queue The JMS queue name.
+     * @param content The message content.
+     * @param replyTo The message ReplyTo.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @throws MBeanException If the MBean fails.
+     */
+    void send(String connectionFactory, String queue, String content, String replyTo, String username, String password) throws MBeanException;
+
+    /**
+     * Consume JMS messages from a given queue.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param queue The JMS queue name.
+     * @param selector A selector to use to consume only certain messages.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return The number of messages consumed.
+     * @throws MBeanException If the MBean fails.
+     */
+    int consume(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException;
+
+    /**
+     * Move JMS messages from one queue to another.
+     *
+     * @param connectionFactory The JMS connection factory name.
+     * @param source The source JMS queue name.
+     * @param destination The destination JMS queue name.
+     * @param selector A selector to move only certain messages.
+     * @param username The (optional) username to connect to the JMS broker.
+     * @param password The (optional) password to connect to the JMS broker.
+     * @return The number of messages moved.
+     * @throws MBeanException If the MBean fails.
+     */
+    int move(String connectionFactory, String source, String destination, String selector, String username, String password) throws MBeanException;
+
+}

http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java
----------------------------------------------------------------------
diff --git a/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java b/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java
new file mode 100644
index 0000000..acf13bf
--- /dev/null
+++ b/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.jms;
+
+import javax.jms.*;
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Describe a JMS message is more human readable way.
+ */
+public class JmsMessage {
+
+    private Map<String, Object> properties = new HashMap<>();
+
+    private String content;
+    private String charset = "UTF-8";
+    private String correlationID;
+    private String deliveryMode;
+    private String destination;
+    private String expiration;
+    private String messageId;
+    private int priority;
+    private boolean redelivered;
+    private String replyTo;
+    private String timestamp;
+    private String type;
+
+    public JmsMessage(Message message) {
+        try {
+            initFromMessage(message);
+        } catch (JMSException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public void initFromMessage(Message message) throws JMSException {
+        @SuppressWarnings("unchecked")
+        Enumeration<String> names = message.getPropertyNames();
+        while (names.hasMoreElements()) {
+            String key = names.nextElement();
+            Object value = message.getObjectProperty(key);
+            properties.put(key, value);
+        }
+
+        correlationID = message.getJMSCorrelationID();
+        if (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT) {
+            deliveryMode = "Non Persistent";
+        } else {
+            deliveryMode = "Persistent";
+        }
+        Destination destinationDest = message.getJMSDestination();
+        if (destinationDest != null) {
+            destination = destinationDest.toString();
+        }
+        if (message.getJMSExpiration() > 0) {
+            expiration = new Date(message.getJMSExpiration()).toString();
+        } else {
+            expiration = "Never";
+        }
+        messageId = message.getJMSMessageID();
+        priority = message.getJMSPriority();
+        redelivered = message.getJMSRedelivered();
+        Destination replyToDest = message.getJMSReplyTo();
+        if (replyToDest != null) {
+            replyTo = replyToDest.toString();
+        }
+        if (message.getJMSTimestamp() > 0) {
+            timestamp = new Date(message.getJMSTimestamp()).toString();
+        } else {
+            timestamp = "";
+        }
+        type = message.getJMSType();
+        content = getMessageContent(message);
+    }
+
+
+    private String getMessageContent(Message message) throws JMSException {
+        if (message instanceof TextMessage) {
+            return ((TextMessage) message).getText();
+        } else if (message instanceof BytesMessage) {
+            BytesMessage bMessage = (BytesMessage) message;
+            long length = bMessage.getBodyLength();
+            byte[] content = new byte[(int) length];
+            bMessage.readBytes(content);
+            try {
+                return new String(content, charset);
+            } catch (UnsupportedEncodingException e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        }
+        return "";
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public String getCharset() {
+        return charset;
+    }
+
+    public String getCorrelationID() {
+        return correlationID;
+    }
+
+    public String getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public String getExpiration() {
+        return expiration;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public boolean isRedelivered() {
+        return redelivered;
+    }
+
+    public String getReplyTo() {
+        return replyTo;
+    }
+
+    public String getTimestamp() {
+        return timestamp;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+}
\ No newline at end of file