You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [20/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
+ *
+ */
+class TopicSubscriberAdaptor implements TopicSubscriber
+{
+    private final Topic _topic;
+    private final MessageConsumer _consumer;
+    private final boolean _noLocal;
+
+    TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+    {
+        _topic = topic;
+        _consumer = consumer;
+        _noLocal = noLocal;
+    }
+    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+    {
+        this(topic, consumer, consumer.isNoLocal());
+    }
+    public Topic getTopic() throws JMSException
+    {
+        return _topic;
+    }
+
+    public boolean getNoLocal() throws JMSException
+    {
+        return _noLocal;
+    }
+
+    public String getMessageSelector() throws JMSException
+    {
+        return _consumer.getMessageSelector();
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        return _consumer.getMessageListener();
+    }
+
+    public void setMessageListener(MessageListener messageListener) throws JMSException
+    {
+        _consumer.setMessageListener(messageListener);
+    }
+
+    public Message receive() throws JMSException
+    {
+        return _consumer.receive();
+    }
+
+    public Message receive(long l) throws JMSException
+    {
+        return _consumer.receive(l);
+    }
+
+    public Message receiveNoWait() throws JMSException
+    {
+        return _consumer.receiveNoWait();
+    }
+
+    public void close() throws JMSException
+    {
+        _consumer.close();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+/**
+ * This exception is thrown when failover is taking place and we need to let other
+ * parts of the client know about this.
+ */
+public class FailoverException extends RuntimeException
+{
+    public FailoverException(String message)
+    {
+        super(message);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverState;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * When failover is required, we need a separate thread to handle the establishment of the new connection and
+ * the transfer of subscriptions.
+ * </p>
+ * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor
+ * thread. One significant task is the connection setup which involves a protocol exchange until a particular state
+ * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means
+ * the IO processor is not able to do anything at all.
+ */
+public class FailoverHandler implements Runnable
+{
+    private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
+
+    private final IoSession _session;
+    private AMQProtocolHandler _amqProtocolHandler;
+
+    /**
+     * Used where forcing the failover host
+     */
+    private String _host;
+
+    /**
+     * Used where forcing the failover port
+     */
+    private int _port;
+
+    public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+    {
+        _amqProtocolHandler = amqProtocolHandler;
+        _session = session;
+    }
+
+    public void run()
+    {
+        if (Thread.currentThread().isDaemon())
+        {
+            throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
+        }
+        //Thread.currentThread().setName("Failover Thread");
+
+        _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1));
+
+        // We wake up listeners. If they can handle failover, they will extend the
+        // FailoverSupport class and will in turn block on the latch until failover
+        // has completed before retrying the operation
+        _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+
+        // Since failover impacts several structures we protect them all with a single mutex. These structures
+        // are also in child objects of the connection. This allows us to manipulate them without affecting
+        // client code which runs in a separate thread.
+        synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
+        {
+            _logger.info("Starting failover process");
+
+            // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
+            // state works, without us having to terminate any existing "state waiters". We could theoretically
+            // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
+            // a slightly more complex state model therefore I felt it was worthwhile doing this.
+            AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+            _amqProtocolHandler.setStateManager(new AMQStateManager());
+            if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
+            {
+                _amqProtocolHandler.setStateManager(existingStateManager);
+                if (_host != null)
+                {
+                    _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"));
+                }
+                else
+                {
+                    _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"));
+                }
+                _amqProtocolHandler.getFailoverLatch().countDown();
+                _amqProtocolHandler.setFailoverLatch(null);
+                return;
+            }
+            boolean failoverSucceeded;
+            // when host is non null we have a specified failover host otherwise we all the client to cycle through
+            // all specified hosts
+
+            // if _host has value then we are performing a redirect.
+            if (_host != null)
+            {
+                failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, _amqProtocolHandler.isUseSSL());
+            }
+            else
+            {
+                failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection();
+            }
+            if (!failoverSucceeded)
+            {
+                _amqProtocolHandler.setStateManager(existingStateManager);
+                _amqProtocolHandler.getConnection().exceptionReceived(
+                        new AMQDisconnectedException("Server closed connection and no failover " +
+                                "was successful"));
+            }
+            else
+            {
+                _amqProtocolHandler.setStateManager(existingStateManager);
+                try
+                {
+                    if (_amqProtocolHandler.getConnection().firePreResubscribe())
+                    {
+                        _logger.info("Resubscribing on new connection");
+                        _amqProtocolHandler.getConnection().resubscribeSessions();
+                    }
+                    else
+                    {
+                        _logger.info("Client vetoed automatic resubscription");
+                    }
+                    _amqProtocolHandler.getConnection().fireFailoverComplete();
+                    _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED);
+                    _logger.info("Connection failover completed successfully");
+                }
+                catch (Exception e)
+                {
+                    _logger.info("Failover process failed - exception being propagated by protocol handler");
+                    _amqProtocolHandler.setFailoverState(FailoverState.FAILED);
+                    try
+                    {
+                        _amqProtocolHandler.exceptionCaught(_session, e);
+                    }
+                    catch (Exception ex)
+                    {
+                        _logger.error("Error notifying protocol session of error: " + ex, ex);
+                    }
+                }
+            }
+        }
+        _amqProtocolHandler.getFailoverLatch().countDown();
+    }
+
+    public String getHost()
+    {
+        return _host;
+    }
+
+    public void setHost(String host)
+    {
+        _host = host;
+    }
+
+    public int getPort()
+    {
+        return _port;
+    }
+
+    public void setPort(int port)
+    {
+        _port = port;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+/**
+ * Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be
+ * dealt with and can happen during failover.
+ */
+public final class FailoverState
+{
+    private final String _state;
+
+    /** Failover has not yet been attempted on this connection */
+    public static final FailoverState NOT_STARTED = new FailoverState("NOT STARTED");
+
+    /** Failover has been requested on this connection but has not completed */
+    public static final FailoverState IN_PROGRESS = new FailoverState("IN PROGRESS");
+
+    /** Failover has been attempted and failed */
+    public static final FailoverState FAILED = new FailoverState("FAILED");
+
+    private FailoverState(String state)
+    {
+        _state = state;
+    }
+
+    public String toString()
+    {
+        return "FailoverState: " + _state;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.failover.FailoverException;
+
+import javax.jms.JMSException;
+
+public abstract class FailoverSupport
+{
+    private static final Logger _log = Logger.getLogger(FailoverSupport.class);
+
+    public Object execute(AMQConnection con) throws JMSException
+    {
+        // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding.
+        // Any method that can potentially block for any reason should use this class so that deadlock will not
+        // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners)
+        // that might be causing a block. When that happens, the exception is caught here and the mutex is released
+        // before waiting for the failover to complete (either successfully or unsuccessfully).
+        while (true)
+        {
+            try
+            {
+                con.blockUntilNotFailingOver();
+            }
+            catch (InterruptedException e)
+            {
+                _log.info("Interrupted: " + e, e);
+                return null;
+            }
+            synchronized (con.getFailoverMutex())
+            {
+                try
+                {
+                    return operation();
+                }
+                catch (FailoverException e)
+                {
+                    _log.info("Failover exception caught during operation");
+                }
+            }
+        }
+    }
+
+    protected abstract Object operation() throws JMSException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.message.UnprocessedMessage;
+
+public class BasicDeliverMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class);
+
+    private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler();
+
+    public static BasicDeliverMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        final UnprocessedMessage msg = new UnprocessedMessage();
+        msg.deliverBody = (BasicDeliverBody) evt.getMethod();
+        msg.channelId = evt.getChannelId();
+        _logger.debug("New JmsDeliver method received");
+        evt.getProtocolSession().unprocessedMessageReceived(msg);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.BasicReturnBody;
+
+public class BasicReturnMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class);
+
+    private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler();
+
+    public static BasicReturnMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.debug("New JmsBounce method received");
+        final UnprocessedMessage msg = new UnprocessedMessage();
+        msg.deliverBody = null;
+        msg.bounceBody = (BasicReturnBody) evt.getMethod();
+        msg.channelId = evt.getChannelId();
+
+        evt.getProtocolSession().unprocessedMessageReceived(msg);
+    }
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+
+public class ChannelCloseMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class);
+
+    private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler();
+
+    public static ChannelCloseMethodHandler getInstance()
+    {
+        return _handler;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+         _logger.debug("ChannelClose method received");
+        ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+        int errorCode = method.replyCode;
+        String reason = method.replyText;
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+        }
+
+        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        evt.getProtocolSession().writeFrame(frame);
+        if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
+        {
+            _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+            if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+            {
+                throw new AMQNoConsumersException("Error: " + reason, null);
+            }
+            else
+            {
+                if (errorCode == AMQConstant.NO_ROUTE.getCode())
+                {
+                   throw new AMQNoRouteException("Error: " + reason, null);
+                }
+                else
+                {
+                    throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+                }
+            }
+        }
+        evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseOkMethodHandler.class);
+
+    private static final ChannelCloseOkMethodHandler _instance = new ChannelCloseOkMethodHandler();
+
+    public static ChannelCloseOkMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+
+        //todo this should do the closure
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+
+public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
+{
+     private static final Logger _logger = Logger.getLogger(ChannelFlowOkMethodHandler.class);
+     private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler();
+
+     public static ChannelFlowOkMethodHandler getInstance()
+     {
+         return _instance;
+     }
+
+     private ChannelFlowOkMethodHandler()
+     {
+     }
+
+     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+     {
+         ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
+         _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+     }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+    private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler();
+
+    public static ConnectionCloseMethodHandler getInstance()
+    {
+        return _handler;
+    }
+
+    private ConnectionCloseMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.info("ConnectionClose frame received");
+        ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
+
+        // does it matter
+        //stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
+        int errorCode = method.replyCode;
+        String reason = method.replyText;
+
+        // TODO: check whether channel id of zero is appropriate
+        evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+
+        if (errorCode != 200)
+        {
+            if(errorCode == AMQConstant.NOT_ALLOWED.getCode())
+            {
+                _logger.info("Authentication Error:"+Thread.currentThread().getName());
+
+                evt.getProtocolSession().closeProtocolSession();
+
+                 //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
+                 stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
+
+                throw new AMQAuthenticationException(errorCode, reason);
+            }
+            else
+            {
+                _logger.info("Connection close received with error code " + errorCode);
+
+
+                throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+            }
+        }
+
+        // this actually closes the connection in the case where it is not an error.
+
+        evt.getProtocolSession().closeProtocolSession();
+
+        stateManager.changeState(AMQState.CONNECTION_CLOSED);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+
+public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
+{
+
+    private static final Logger _logger = Logger.getLogger(ConnectionOpenOkMethodHandler.class);
+
+    private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
+
+    public static ConnectionOpenOkMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ConnectionOpenOkMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        AMQProtocolSession session = evt.getProtocolSession();
+        ConnectionOpenOkBody method = (ConnectionOpenOkBody) evt.getMethod();        
+        stateManager.changeState(AMQState.CONNECTION_OPEN);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionRedirectBody;
+
+public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(ConnectionRedirectMethodHandler.class);
+
+    private static final int DEFAULT_REDIRECT_PORT = 5672;
+
+    private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler();
+
+    public static ConnectionRedirectMethodHandler getInstance()
+    {
+        return _handler;
+    }
+
+    private ConnectionRedirectMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.info("ConnectionRedirect frame received");
+        ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
+
+        // the host is in the form hostname:port with the port being optional
+        int portIndex = method.host.indexOf(':');
+        String host;
+        int port;
+        if (portIndex == -1)
+        {
+            host = method.host;
+            port = DEFAULT_REDIRECT_PORT;
+        }
+        else
+        {
+            host = method.host.substring(0, portIndex);
+            port = Integer.parseInt(method.host.substring(portIndex + 1));
+        }
+        evt.getProtocolSession().failover(host, port);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureMethodHandler implements StateAwareMethodListener
+{
+    private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
+
+    public static ConnectionSecureMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        SaslClient client = evt.getProtocolSession().getSaslClient();
+        if (client == null)
+        {
+            throw new AMQException("No SASL client set up - cannot proceed with authentication");
+        }
+
+        ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod();
+
+        try
+        {
+            // Evaluate server challenge
+            byte[] response = client.evaluateChallenge(body.challenge);
+            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+            evt.getProtocolSession().writeFrame(responseFrame);
+        }
+        catch (SaslException e)
+        {
+            throw new AMQException("Error processing SASL challenge: " + e, e);
+        }
+
+
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,184 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.security.AMQCallbackHandler;
+import org.apache.qpid.client.security.CallbackHandlerRegistry;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.FieldTable;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
+public class ConnectionStartMethodHandler implements StateAwareMethodListener
+{
+
+    private static final Logger _log = Logger.getLogger(ConnectionStartMethodHandler.class);
+
+    private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler();
+
+    public static ConnectionStartMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ConnectionStartMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
+
+        try
+        {
+            // the mechanism we are going to use
+            String mechanism;
+            if (body.mechanisms == null)
+            {
+                throw new AMQException("mechanism not specified in ConnectionStart method frame");
+            }
+            else
+            {
+                mechanism = chooseMechanism(body.mechanisms);
+            }
+
+            if (mechanism == null)
+            {
+                throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+            }
+
+            final AMQProtocolSession ps = evt.getProtocolSession();
+            byte[] saslResponse;
+            try
+            {
+                SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
+                                                      null, "AMQP", "localhost",
+                                                      null, createCallbackHandler(mechanism, ps));
+                if (sc == null)
+                {
+                    throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
+                                           mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " +
+                                           " details of how to register non-standard SASL client providers.");
+                }
+                ps.setSaslClient(sc);
+                saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
+            }
+            catch (SaslException e)
+            {
+                ps.setSaslClient(null);
+                throw new AMQException("Unable to create SASL client: " + e, e);
+            }
+
+            if (body.locales == null)
+            {
+                throw new AMQException("Locales is not defined in Connection Start method");
+            }
+            final String locales = new String(body.locales, "utf8");
+            final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+            String selectedLocale = null;
+            if (tokenizer.hasMoreTokens())
+            {
+                selectedLocale = tokenizer.nextToken();
+            }
+            else
+            {
+                throw new AMQException("No locales sent from server, passed: " + locales);
+            }
+
+            stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+            FieldTable clientProperties = new FieldTable();
+            clientProperties.put("instance", ps.getClientID());
+            clientProperties.put("product", "Qpid");
+            clientProperties.put("version", "1.0");
+            clientProperties.put("platform", getFullSystemInfo());
+            ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
+                                                               saslResponse, selectedLocale));
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            throw new AMQException(_log, "Unable to decode data: " + e, e);
+        }
+    }
+
+    private String getFullSystemInfo()
+    {
+        StringBuffer fullSystemInfo = new StringBuffer();
+        fullSystemInfo.append(System.getProperty("java.runtime.name"));
+        fullSystemInfo.append(", " + System.getProperty("java.runtime.version"));
+        fullSystemInfo.append(", " + System.getProperty("java.vendor"));
+        fullSystemInfo.append(", " + System.getProperty("os.arch"));
+        fullSystemInfo.append(", " + System.getProperty("os.name"));
+        fullSystemInfo.append(", " + System.getProperty("os.version"));
+        fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level"));
+
+        return fullSystemInfo.toString();
+    }
+
+    private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
+    {
+        final String mechanisms = new String(availableMechanisms, "utf8");
+        StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+        HashSet mechanismSet = new HashSet();
+        while (tokenizer.hasMoreTokens())
+        {
+            mechanismSet.add(tokenizer.nextToken());
+        }
+
+        String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+        StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+        while (prefTokenizer.hasMoreTokens())
+        {
+            String mech = prefTokenizer.nextToken();
+            if (mechanismSet.contains(mech))
+            {
+                return mech;
+            }
+        }
+        return null;
+    }
+
+    private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
+            throws AMQException
+    {
+        Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
+        try
+        {
+            Object instance = mechanismClass.newInstance();
+            AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
+            cbh.initialise(protocolSession);
+            return cbh;
+        }
+        catch (Exception e)
+        {
+            throw new AMQException("Unable to create callback handler: " + e, e);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.AMQFrame;
+
+public class ConnectionTuneMethodHandler implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+
+    private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
+
+    public static ConnectionTuneMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    protected ConnectionTuneMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.debug("ConnectionTune frame received");
+        ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
+        AMQProtocolSession session = evt.getProtocolSession();
+
+        ConnectionTuneParameters params = session.getConnectionTuneParameters();
+        if (params == null)
+        {
+            params = new ConnectionTuneParameters();
+        }
+
+        params.setFrameMax(frame.frameMax);        
+        params.setChannelMax(frame.channelMax);
+        params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+        session.setConnectionTuneParameters(params);
+
+        stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+        session.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
+        session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
+    }
+
+    protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+    {
+        return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist);
+    }
+
+    protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
+    {
+        return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat());
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.client.AMQSession;
+
+public class AMQMessage
+{
+    protected ContentHeaderProperties _contentHeaderProperties;
+
+    /**
+     * If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required
+     */
+    protected AMQSession _session;
+
+    protected final long _deliveryTag;
+
+    public AMQMessage(ContentHeaderProperties properties, long deliveryTag)
+    {
+        _contentHeaderProperties = properties;
+        _deliveryTag = deliveryTag;
+    }
+
+    public AMQMessage(ContentHeaderProperties properties)
+    {
+        this(properties, -1);
+    }
+
+    /**
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user
+     * calls acknowledge()
+     * @param s the AMQ session that delivered this message
+     */
+    public void setAMQSession(AMQSession s)
+    {
+        _session = s;
+    }
+
+    public AMQSession getAMQSession()
+    {
+        return _session;
+    }
+
+    /**
+     * Get the AMQ message number assigned to this message
+     * @return the message number
+     */
+    public long getDeliveryTag()
+    {
+        return _deliveryTag;
+    }       
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,677 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableKeyEnumeration;
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Map;
+
+public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
+{
+    private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+//    protected Map _messageProperties;
+
+    public static final char BOOLEAN_PROPERTY_PREFIX = 'B';
+    public static final char BYTE_PROPERTY_PREFIX = 'b';
+    public static final char SHORT_PROPERTY_PREFIX = 's';
+    public static final char INT_PROPERTY_PREFIX = 'i';
+    public static final char LONG_PROPERTY_PREFIX = 'l';
+    public static final char FLOAT_PROPERTY_PREFIX = 'f';
+    public static final char DOUBLE_PROPERTY_PREFIX = 'd';
+    public static final char STRING_PROPERTY_PREFIX = 'S';
+
+    protected boolean _redelivered;
+
+    protected ByteBuffer _data;
+
+    protected AbstractJMSMessage(ByteBuffer data)
+    {
+        super(new BasicContentHeaderProperties());
+        _data = data;
+        if (_data != null)
+        {
+            _data.acquire();
+        }
+    }
+
+    protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
+    {
+        this(contentHeader, deliveryTag);
+        _data = data;
+        if (_data != null)
+        {
+            _data.acquire();
+        }
+    }
+
+    protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
+    {
+        super(contentHeader, deliveryTag);
+    }
+
+    public String getJMSMessageID() throws JMSException
+    {
+        if (getJmsContentHeaderProperties().getMessageId() == null)
+        {
+            getJmsContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
+        }
+        return getJmsContentHeaderProperties().getMessageId();
+    }
+
+    public void setJMSMessageID(String messageId) throws JMSException
+    {
+        getJmsContentHeaderProperties().setMessageId(messageId);
+    }
+
+    public long getJMSTimestamp() throws JMSException
+    {
+        return new Long(getJmsContentHeaderProperties().getTimestamp()).longValue();
+    }
+
+    public void setJMSTimestamp(long timestamp) throws JMSException
+    {
+        getJmsContentHeaderProperties().setTimestamp(timestamp);
+    }
+
+    public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+    {
+        return getJmsContentHeaderProperties().getCorrelationId().getBytes();
+    }
+
+    public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+    {
+        getJmsContentHeaderProperties().setCorrelationId(new String(bytes));
+    }
+
+    public void setJMSCorrelationID(String correlationId) throws JMSException
+    {
+        getJmsContentHeaderProperties().setCorrelationId(correlationId);
+    }
+
+    public String getJMSCorrelationID() throws JMSException
+    {
+        return getJmsContentHeaderProperties().getCorrelationId();
+    }
+
+    public Destination getJMSReplyTo() throws JMSException
+    {
+        String replyToEncoding = getJmsContentHeaderProperties().getReplyTo();
+        if (replyToEncoding == null)
+        {
+            return null;
+        }
+        else
+        {
+            Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+            if (dest == null)
+            {
+                char destType = replyToEncoding.charAt(0);
+                if (destType == 'Q')
+                {
+                    dest = new AMQQueue(replyToEncoding.substring(1));
+                }
+                else if (destType == 'T')
+                {
+                    dest = new AMQTopic(replyToEncoding.substring(1));
+                }
+                else
+                {
+                    throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding);
+                }
+                _destinationCache.put(replyToEncoding, dest);
+            }
+            return dest;
+        }
+    }
+
+    public void setJMSReplyTo(Destination destination) throws JMSException
+    {
+        if (destination == null)
+        {
+            throw new IllegalArgumentException("Null destination not allowed");
+        }
+        if (!(destination instanceof AMQDestination))
+        {
+            throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " +
+                    destination.getClass());
+        }
+        final AMQDestination amqd = (AMQDestination) destination;
+
+        final String encodedDestination = amqd.getEncodedName();
+        _destinationCache.put(encodedDestination, destination);
+        getJmsContentHeaderProperties().setReplyTo(encodedDestination);
+    }
+
+    public Destination getJMSDestination() throws JMSException
+    {
+        // TODO: implement this once we have sorted out how to figure out the exchange class
+        throw new NotImplementedException();
+    }
+
+    public void setJMSDestination(Destination destination) throws JMSException
+    {
+        throw new NotImplementedException();
+    }
+
+    public int getJMSDeliveryMode() throws JMSException
+    {
+        return getJmsContentHeaderProperties().getDeliveryMode();
+    }
+
+    public void setJMSDeliveryMode(int i) throws JMSException
+    {
+        getJmsContentHeaderProperties().setDeliveryMode((byte) i);
+    }
+
+    public boolean getJMSRedelivered() throws JMSException
+    {
+        return _redelivered;
+    }
+
+    public void setJMSRedelivered(boolean b) throws JMSException
+    {
+        _redelivered = b;
+    }
+
+    public String getJMSType() throws JMSException
+    {
+        return getMimeType();
+    }
+
+    public void setJMSType(String string) throws JMSException
+    {
+        throw new JMSException("Cannot set JMS Type - it is implicitly defined based on message type");
+    }
+
+    public long getJMSExpiration() throws JMSException
+    {
+        return new Long(getJmsContentHeaderProperties().getExpiration()).longValue();
+    }
+
+    public void setJMSExpiration(long l) throws JMSException
+    {
+        getJmsContentHeaderProperties().setExpiration(l);
+    }
+
+    public int getJMSPriority() throws JMSException
+    {
+        return getJmsContentHeaderProperties().getPriority();
+    }
+
+    public void setJMSPriority(int i) throws JMSException
+    {
+        getJmsContentHeaderProperties().setPriority((byte) i);
+    }
+
+    public void clearProperties() throws JMSException
+    {
+        if (getJmsContentHeaderProperties().getHeaders() != null)
+        {
+            getJmsContentHeaderProperties().getHeaders().clear();
+        }
+    }
+
+    public boolean propertyExists(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return false;
+        }
+        else
+        {
+            // TODO: fix this
+            return getJmsContentHeaderProperties().getHeaders().containsKey(STRING_PROPERTY_PREFIX + propertyName);
+        }
+    }
+
+    public boolean getBooleanProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Boolean.valueOf(null).booleanValue();
+        }
+        else
+        {
+            // store as integer as temporary workaround
+            //Boolean b = (Boolean) getJmsContentHeaderProperties().headers.get(BOOLEAN_PROPERTY_PREFIX + propertyName);
+            Long b = (Long) getJmsContentHeaderProperties().getHeaders().get(BOOLEAN_PROPERTY_PREFIX + propertyName);
+
+            if (b == null)
+            {
+                return Boolean.valueOf(null).booleanValue();
+            }
+            else
+            {
+                return b.longValue() != 0;
+            }
+        }
+    }
+
+    public byte getByteProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Byte.valueOf(null).byteValue();
+        }
+        else
+        {
+            Byte b = (Byte) getJmsContentHeaderProperties().getHeaders().get(BYTE_PROPERTY_PREFIX + propertyName);
+            if (b == null)
+            {
+                return Byte.valueOf(null).byteValue();
+            }
+            else
+            {
+                return b.byteValue();
+            }
+        }
+    }
+
+    public short getShortProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Short.valueOf(null).shortValue();
+        }
+        else
+        {
+            Short s = (Short) getJmsContentHeaderProperties().getHeaders().get(SHORT_PROPERTY_PREFIX + propertyName);
+            if (s == null)
+            {
+                return Short.valueOf(null).shortValue();
+            }
+            else
+            {
+                return s.shortValue();
+            }
+        }
+    }
+
+    public int getIntProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Integer.valueOf(null).intValue();
+        }
+        else
+        {
+            Integer i = (Integer) getJmsContentHeaderProperties().getHeaders().get(INT_PROPERTY_PREFIX + propertyName);
+            if (i == null)
+            {
+                return Integer.valueOf(null).intValue();
+            }
+            else
+            {
+                return i.intValue();
+            }
+        }
+    }
+
+    public long getLongProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Long.valueOf(null).longValue();
+        }
+        else
+        {
+            Long l = (Long) getJmsContentHeaderProperties().getHeaders().get(LONG_PROPERTY_PREFIX + propertyName);
+            if (l == null)
+            {
+                // temp - the spec says do this but this throws a NumberFormatException
+                //return Long.valueOf(null).longValue();
+                return 0;
+            }
+            else
+            {
+                return l.longValue();
+            }
+        }
+    }
+
+    public float getFloatProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Float.valueOf(null).floatValue();
+        }
+        else
+        {
+            final Float f = (Float) getJmsContentHeaderProperties().getHeaders().get(FLOAT_PROPERTY_PREFIX + propertyName);
+            if (f == null)
+            {
+                return Float.valueOf(null).floatValue();
+            }
+            else
+            {
+                return f.floatValue();
+            }
+        }
+    }
+
+    public double getDoubleProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return Double.valueOf(null).doubleValue();
+        }
+        else
+        {
+            final Double d = (Double) getJmsContentHeaderProperties().getHeaders().get(DOUBLE_PROPERTY_PREFIX + propertyName);
+            if (d == null)
+            {
+                return Double.valueOf(null).doubleValue();
+            }
+            else
+            {
+                return d.shortValue();
+            }
+        }
+    }
+
+    public String getStringProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return null;
+        }
+        else
+        {
+            return (String) getJmsContentHeaderProperties().getHeaders().get(STRING_PROPERTY_PREFIX + propertyName);
+        }
+    }
+
+    public Object getObjectProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        throw new JMSException("Not implemented yet");
+    }
+
+    public Enumeration getPropertyNames() throws JMSException
+    {
+        return new FieldTableKeyEnumeration(getJmsContentHeaderProperties().getHeaders())
+        {
+            public Object nextElement()
+            {
+                String propName = (String) _iterator.next();
+
+                //The propertyName has a single Char prefix. Skip this.
+                return propName.substring(1);
+            }
+        };
+    }
+
+    public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        //getJmsContentHeaderProperties().headers.put(BOOLEAN_PROPERTY_PREFIX + propertyName, Boolean.valueOf(b));
+        getJmsContentHeaderProperties().getHeaders().put(BOOLEAN_PROPERTY_PREFIX + propertyName, b ? new Long(1) : new Long(0));
+    }
+
+    public void setByteProperty(String propertyName, byte b) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().put(BYTE_PROPERTY_PREFIX + propertyName, new Byte(b));
+    }
+
+    public void setShortProperty(String propertyName, short i) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().put(SHORT_PROPERTY_PREFIX + propertyName, new Short(i));
+    }
+
+    public void setIntProperty(String propertyName, int i) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().put(INT_PROPERTY_PREFIX + propertyName, new Integer(i));
+    }
+
+    public void setLongProperty(String propertyName, long l) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().put(LONG_PROPERTY_PREFIX + propertyName, new Long(l));
+    }
+
+    public void setFloatProperty(String propertyName, float f) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().put(FLOAT_PROPERTY_PREFIX + propertyName, new Float(f));
+    }
+
+    public void setDoubleProperty(String propertyName, double v) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().put(DOUBLE_PROPERTY_PREFIX + propertyName, new Double(v));
+    }
+
+    public void setStringProperty(String propertyName, String value) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        createPropertyMapIfRequired();
+        propertyName = STRING_PROPERTY_PREFIX + propertyName;
+        getJmsContentHeaderProperties().getHeaders().put(propertyName, value);
+    }
+
+    private void createPropertyMapIfRequired()
+    {
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            getJmsContentHeaderProperties().setHeaders(new FieldTable());
+        }
+    }
+
+    public void setObjectProperty(String string, Object object) throws JMSException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void acknowledge() throws JMSException
+    {
+        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+        if (_session != null)
+        {
+            // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+            // received on the session
+            _session.acknowledgeMessage(_deliveryTag, true);
+        }
+    }
+
+    public abstract void clearBody() throws JMSException;
+
+    /**
+     * Get a String representation of the body of the message. Used in the
+     * toString() method which outputs this before message properties.
+     */
+    public abstract String toBodyString() throws JMSException;
+
+    public abstract String getMimeType();
+
+    public String toString()
+    {
+        try
+        {
+            StringBuffer buf = new StringBuffer("Body:\n");
+            buf.append(toBodyString());
+            buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
+            buf.append("\nJMS expiration: ").append(getJMSExpiration());
+            buf.append("\nJMS priority: ").append(getJMSPriority());
+            buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
+            buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+            buf.append("\nAMQ message number: ").append(_deliveryTag);
+            buf.append("\nProperties:");
+            if (getJmsContentHeaderProperties().getHeaders() == null)
+            {
+                buf.append("<NONE>");
+            }
+            else
+            {
+                final Iterator it = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
+                while (it.hasNext())
+                {
+                    final Map.Entry entry = (Map.Entry) it.next();
+                    final String propertyName = (String) entry.getKey();
+                    if (propertyName == null)
+                    {
+                        buf.append("\nInternal error: Property with NULL key defined");
+                    }
+                    else
+                    {
+                        buf.append('\n').append(propertyName.substring(1));
+
+                        char typeIdentifier = propertyName.charAt(0);
+                        switch (typeIdentifier)
+                        {
+                            case org.apache.qpid.client.message.AbstractJMSMessage.BOOLEAN_PROPERTY_PREFIX:
+                                buf.append("<boolean> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.BYTE_PROPERTY_PREFIX:
+                                buf.append("<byte> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.SHORT_PROPERTY_PREFIX:
+                                buf.append("<short> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.INT_PROPERTY_PREFIX:
+                                buf.append("<int> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.LONG_PROPERTY_PREFIX:
+                                buf.append("<long> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.FLOAT_PROPERTY_PREFIX:
+                                buf.append("<float> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.DOUBLE_PROPERTY_PREFIX:
+                                buf.append("<double> ");
+                                break;
+                            case org.apache.qpid.client.message.AbstractJMSMessage.STRING_PROPERTY_PREFIX:
+                                buf.append("<string> ");
+                                break;
+                            default:
+                                buf.append("<unknown type (identifier " +
+                                        typeIdentifier + ") ");
+                        }
+                        buf.append(String.valueOf(entry.getValue()));
+                    }
+                }
+            }
+            return buf.toString();
+        }
+        catch (JMSException e)
+        {
+            return e.toString();
+        }
+    }
+
+    public Map getUnderlyingMessagePropertiesMap()
+    {
+        return getJmsContentHeaderProperties().getHeaders();
+    }
+
+    public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
+    {
+        getJmsContentHeaderProperties().setHeaders(messageProperties);
+    }
+
+    private void checkPropertyName(String propertyName)
+    {
+        if (propertyName == null)
+        {
+            throw new IllegalArgumentException("Property name must not be null");
+        }
+        else if ("".equals(propertyName))
+        {
+            throw new IllegalArgumentException("Property name must not be the empty string");
+        }
+
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            getJmsContentHeaderProperties().setHeaders(new FieldTable());
+        }
+    }
+
+    public FieldTable populateHeadersFromMessageProperties()
+    {
+        if (getJmsContentHeaderProperties().getHeaders() == null)
+        {
+            return null;
+        }
+        else
+        {
+            //
+            // We need to convert every property into a String representation
+            // Note that type information is preserved in the property name
+            //
+            final FieldTable table = new FieldTable();
+            final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
+            while (entries.hasNext())
+            {
+                final Map.Entry entry = (Map.Entry) entries.next();
+                final String propertyName = (String) entry.getKey();
+                if (propertyName == null)
+                {
+                    continue;
+                }
+                else
+                {
+                    table.put(propertyName, entry.getValue().toString());
+                }
+            }
+            return table;
+        }
+    }
+
+    public BasicContentHeaderProperties getJmsContentHeaderProperties()
+    {
+        return (BasicContentHeaderProperties) _contentHeaderProperties;
+    }
+
+    public ByteBuffer getData()
+    {
+        // make sure we rewind the data just in case any method has moved the
+        // position beyond the start
+        if (_data != null)
+        {
+            _data.rewind();
+        }
+        return _data;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.JMSException;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractJMSMessageFactory implements MessageFactory
+{
+    private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class);
+
+
+    protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data,
+                                                                ContentHeaderBody contentHeader) throws AMQException;
+
+    protected AbstractJMSMessage createMessageWithBody(long messageNbr,
+                                                       ContentHeaderBody contentHeader,
+                                                       List bodies) throws AMQException
+    {
+        ByteBuffer data;
+
+        // we optimise the non-fragmented case to avoid copying
+        if (bodies != null && bodies.size() == 1)
+        {
+            _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")");
+            data = ((ContentBody)bodies.get(0)).payload;
+        }
+        else
+        {
+            _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")");
+            data = ByteBuffer.allocate((int)contentHeader.bodySize); // XXX: Is cast a problem?
+            final Iterator it = bodies.iterator();
+            while (it.hasNext())
+            {
+                ContentBody cb = (ContentBody) it.next();
+                data.put(cb.payload);
+                cb.payload.release();
+            }
+            data.flip();
+        }
+        _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
+
+        return createMessage(messageNbr, data, contentHeader);
+    }
+
+    public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
+                                            ContentHeaderBody contentHeader,
+                                            List bodies) throws JMSException, AMQException
+    {
+        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, bodies);
+        msg.setJMSRedelivered(redelivered);
+        return msg;
+    }
+
+}