You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/05/03 19:47:25 UTC

[1/3] qpid-proton git commit: PROTON-1188, PROTON-1189: remove stale contrib/proton-jms and contrib/proton-hawtdispatch modules

Repository: qpid-proton
Updated Branches:
  refs/heads/master 99a905615 -> 564a4d024


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
deleted file mode 100644
index af27a77..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedByte;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.*;
-
-import javax.jms.*;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class JMSMappingOutboundTransformer extends OutboundTransformer {
-
-    public JMSMappingOutboundTransformer(JMSVendor vendor) {
-        super(vendor);
-    }
-
-    @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if( msg == null )
-            return null;
-        try {
-            if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
-                return null;
-            }
-        } catch (MessageFormatException e) {
-            return null;
-        }
-        ProtonJMessage amqp = convert(msg);
-
-        long messageFormat;
-        try {
-            messageFormat = msg.getLongProperty(this.messageFormatKey);
-        } catch (MessageFormatException e) {
-            return null;
-        }
-
-        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
-        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-        int c = amqp.encode(new CompositeWritableBuffer(
-                new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-        if( overflow.position() > 0 ) {
-            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-        }
-
-        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
-    }
-
-    /**
-     * Perform the conversion between JMS Message and Proton Message without re-encoding it to array.
-     * This is needed because some frameworks may elect to do this on their own way (Netty for instance using Nettybuffers)
-     *
-     * @param msg the supplied JMS Message
-     * @return the converted Proton Message
-     */
-    public ProtonJMessage convert(Message msg)
-            throws JMSException, UnsupportedEncodingException {
-        Header header = new Header();
-        Properties props=new Properties();
-        HashMap<Symbol, Object> daMap = null;
-        HashMap<Symbol, Object> maMap = null;
-        HashMap apMap = null;
-        Section body=null;
-        HashMap footerMap = null;
-        if( msg instanceof BytesMessage ) {
-            BytesMessage m = (BytesMessage)msg;
-            byte data[] = new byte[(int) m.getBodyLength()];
-            m.readBytes(data);
-            m.reset(); //Need to reset after readBytes or future readBytes calls (ex: redeliveries) will fail and return -1
-            body = new Data(new Binary(data));
-        } if( msg instanceof TextMessage ) {
-            body = new AmqpValue(((TextMessage) msg).getText());
-        } if( msg instanceof MapMessage ) {
-            final HashMap map = new HashMap();
-            final MapMessage m = (MapMessage) msg;
-            final Enumeration names = m.getMapNames();
-            while (names.hasMoreElements()) {
-                String key = (String) names.nextElement();
-                map.put(key, m.getObject(key));
-            }
-            body = new AmqpValue(map);
-        } if( msg instanceof StreamMessage ) {
-            ArrayList list = new ArrayList();
-            final StreamMessage m = (StreamMessage) msg;
-            try {
-                while(true) {
-                    list.add(m.readObject());
-                }
-            } catch(MessageEOFException e){}
-            body = new AmqpSequence(list);
-        } if( msg instanceof ObjectMessage ) {
-            body = new AmqpValue(((ObjectMessage) msg).getObject());
-        }
-
-        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
-        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
-        if( msg.getJMSType()!=null ) {
-            if( maMap==null ) maMap = new HashMap<Symbol, Object>();
-            maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType());
-        }
-        if( msg.getJMSMessageID()!=null ) {
-            props.setMessageId(msg.getJMSMessageID());
-        }
-        if( msg.getJMSDestination()!=null ) {
-            props.setTo(vendor.toAddress(msg.getJMSDestination()));
-            if( maMap==null ) maMap = new HashMap();
-            maMap.put(Symbol.valueOf("x-opt-to-type"), destinationAttributes(msg.getJMSDestination()));
-        }
-        if( msg.getJMSReplyTo()!=null ) {
-            props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
-            if( maMap==null ) maMap = new HashMap();
-            maMap.put(Symbol.valueOf("x-opt-reply-type"), destinationAttributes(msg.getJMSReplyTo()));
-        }
-        if( msg.getJMSCorrelationID()!=null ) {
-            props.setCorrelationId(msg.getJMSCorrelationID());
-        }
-        if( msg.getJMSExpiration() != 0 ) {
-            long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
-            if (ttl < 0) {
-                ttl = 1;
-            }
-            header.setTtl(new UnsignedInteger((int)ttl));
-
-            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
-        }
-        if( msg.getJMSTimestamp()!= 0 ) {
-            props.setCreationTime(new Date(msg.getJMSTimestamp()));
-        }
-
-        final Enumeration keys = msg.getPropertyNames();
-        while (keys.hasMoreElements()) {
-            String key = (String) keys.nextElement();
-            if( key.equals(messageFormatKey) || key.equals(nativeKey)) {
-                // skip..
-            } else if( key.equals(firstAcquirerKey) ) {
-                header.setFirstAcquirer(msg.getBooleanProperty(key));
-            } else if( key.startsWith("JMSXDeliveryCount") ) {
-                // The AMQP delivery-count field only includes prior failed delivery attempts,
-                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
-                int amqpDeliveryCount = msg.getIntProperty(key) - 1;
-                if( amqpDeliveryCount > 0 ) {
-                    header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
-                }
-            } else if( key.startsWith("JMSXUserID") ) {
-                String value = msg.getStringProperty(key);
-                props.setUserId(new Binary(value.getBytes("UTF-8")));
-            } else if( key.startsWith("JMSXGroupID") ) {
-                String value = msg.getStringProperty(key);
-                props.setGroupId(value);
-                if( apMap==null ) apMap = new HashMap();
-                apMap.put(key, value);
-            } else if( key.startsWith("JMSXGroupSeq") ) {
-                UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
-                props.setGroupSequence(value);
-                if( apMap==null ) apMap = new HashMap();
-                apMap.put(key, value);
-            } else if( key.startsWith(prefixDeliveryAnnotationsKey) ) {
-                if( daMap == null ) daMap = new HashMap<Symbol, Object>();
-                String name = key.substring(prefixDeliveryAnnotationsKey.length());
-                daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
-            } else if( key.startsWith(prefixMessageAnnotationsKey) ) {
-                if( maMap==null ) maMap = new HashMap<Symbol, Object>();
-                String name = key.substring(prefixMessageAnnotationsKey.length());
-                maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
-            } else if( key.equals(subjectKey) ) {
-                props.setSubject(msg.getStringProperty(key));
-            } else if( key.equals(contentTypeKey) ) {
-                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
-            } else if( key.equals(contentEncodingKey) ) {
-                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
-            } else if( key.equals(replyToGroupIDKey) ) {
-                props.setReplyToGroupId(msg.getStringProperty(key));
-            } else if( key.startsWith(prefixFooterKey) ) {
-                if( footerMap==null ) footerMap = new HashMap();
-                String name = key.substring(prefixFooterKey.length());
-                footerMap.put(name, msg.getObjectProperty(key));
-            } else {
-                if( apMap==null ) apMap = new HashMap();
-                apMap.put(key, msg.getObjectProperty(key));
-            }
-        }
-
-
-        MessageAnnotations ma=null;
-        if( maMap!=null ) ma = new MessageAnnotations(maMap);
-        DeliveryAnnotations da=null;
-        if( daMap!=null ) da = new DeliveryAnnotations(daMap);
-        ApplicationProperties ap=null;
-        if( apMap!=null ) ap = new ApplicationProperties(apMap);
-        Footer footer=null;
-        if( footerMap!=null ) footer = new Footer(footerMap);
-
-        return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
-    }
-
-    private static String destinationAttributes(Destination destination) {
-        if( destination instanceof Queue ) {
-            if( destination instanceof TemporaryQueue ) {
-                return "temporary,queue";
-            } else {
-                return "queue";
-            }
-        }
-        if( destination instanceof Topic ) {
-            if( destination instanceof TemporaryTopic ) {
-                return "temporary,topic";
-            } else {
-                return "topic";
-            }
-        }
-        return "";
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSVendor.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSVendor.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSVendor.java
deleted file mode 100644
index 60275bd..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSVendor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.*;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class JMSVendor {
-
-    public abstract BytesMessage createBytesMessage();
-
-    public abstract StreamMessage createStreamMessage();
-
-    public abstract Message createMessage();
-
-    public abstract TextMessage createTextMessage();
-
-    public abstract ObjectMessage createObjectMessage();
-
-    public abstract MapMessage createMapMessage();
-
-    public abstract void setJMSXUserID(Message msg, String value);
-
-    @Deprecated
-    public Destination createDestination(String name) {
-        return null;
-    }
-
-    @SuppressWarnings("deprecation")
-    public <T extends Destination> T createDestination(String name, Class<T> kind) {
-        return kind.cast(createDestination(name));
-    }
-
-    public abstract void setJMSXGroupID(Message msg, String groupId);
-
-    public abstract void setJMSXGroupSequence(Message msg, int i);
-
-    public abstract void setJMSXDeliveryCount(Message rc, long l);
-
-    public abstract String toAddress(Destination msgDestination);
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/OutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/OutboundTransformer.java
deleted file mode 100644
index 09a6e15..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/OutboundTransformer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.engine.Delivery;
-
-import javax.jms.Message;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public abstract class OutboundTransformer {
-
-    JMSVendor vendor;
-    String prefixVendor;
-
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations= "MA_";
-    String prefixFooter = "FT_";
-
-    String messageFormatKey;
-    String nativeKey;
-    String firstAcquirerKey;
-    String prefixDeliveryAnnotationsKey;
-    String prefixMessageAnnotationsKey;
-    String subjectKey;
-    String contentTypeKey;
-    String contentEncodingKey;
-    String replyToGroupIDKey;
-    String prefixFooterKey;
-
-
-
-   public OutboundTransformer(JMSVendor vendor) {
-        this.vendor = vendor;
-        this.setPrefixVendor("JMS_AMQP_");
-    }
-
-    public abstract EncodedMessage transform(Message jms) throws Exception;
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
-
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-
-        messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
-        nativeKey = prefixVendor + "NATIVE";
-        firstAcquirerKey = prefixVendor + "FirstAcquirer";
-        prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
-        prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
-        subjectKey =  prefixVendor +"Subject";
-        contentTypeKey = prefixVendor +"ContentType";
-        contentEncodingKey = prefixVendor +"ContentEncoding";
-        replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
-        prefixFooterKey = prefixVendor + prefixFooter;
-
-    }
-
-    public JMSVendor getVendor() {
-        return vendor;
-    }
-
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformerTest.java b/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformerTest.java
deleted file mode 100644
index 42a99ca..0000000
--- a/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import static org.junit.Assert.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.message.Message;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class JMSMappingInboundTransformerTest
-{
-    @Test
-    public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception
-    {
-        TextMessage mockTextMessage = createMockTextMessage();
-        JMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
-
-        String contentString = "myTextMessageContent";
-        Message amqp = Message.Factory.create();
-        amqp.setBody(new AmqpValue(contentString));
-
-        EncodedMessage em = encodeMessage(amqp);
-
-        javax.jms.Message jmsMessage = transformer.transform(em);
-
-        assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-        Mockito.verify(mockTextMessage).setText(contentString);
-        assertSame("Expected provided mock message, got a different one", mockTextMessage, jmsMessage);
-    }
-
-    // ======= JMSDestination Handling =========
-    // =========================================
-
-    @Test
-    public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Destination.class);
-    }
-
-    @Test
-    public void testTransformWithQueueStringToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class);
-    }
-
-    @Test
-    public void testTransformWithTemporaryQueueStringToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class);
-    }
-
-    @Test
-    public void testTransformWithTopicStringToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class);
-    }
-
-    @Test
-    public void testTransformWithTemporaryTopicStringToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class);
-    }
-
-    private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception
-    {
-        TextMessage mockTextMessage = createMockTextMessage();
-        JMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
-
-        String toAddress = "toAddress";
-        Message amqp = Message.Factory.create();
-        amqp.setBody(new AmqpValue("myTextMessageContent"));
-        amqp.setAddress(toAddress);
-        if(toTypeAnnotationValue != null)
-        {
-            Map<Symbol, Object> map = new HashMap<Symbol, Object>();
-            map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue);
-            MessageAnnotations ma = new MessageAnnotations(map);
-            amqp.setMessageAnnotations(ma);
-        }
-
-        EncodedMessage em = encodeMessage(amqp);
-
-        javax.jms.Message jmsMessage = transformer.transform(em);
-        assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
-        // Verify that createDestination was called with the provided 'to' address and 'Destination' class
-        Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
-    }
-
-    // ======= JMSReplyTo Handling =========
-    // =====================================
-
-    @Test
-    public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null,Destination.class);
-    }
-
-    @Test
-    public void testTransformWithQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class);
-    }
-
-    @Test
-    public void testTransformWithTemporaryQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class);
-    }
-
-    @Test
-    public void testTransformWithTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class);
-    }
-
-    @Test
-    public void testTransformWithTemporaryTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception
-    {
-        doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class);
-    }
-
-    private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception
-    {
-        TextMessage mockTextMessage = createMockTextMessage();
-        JMSVendor mockVendor = createMockVendor(mockTextMessage);
-        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
-
-        String replyToAddress = "replyToAddress";
-        Message amqp = Message.Factory.create();
-        amqp.setBody(new AmqpValue("myTextMessageContent"));
-        amqp.setReplyTo(replyToAddress);
-        if(replyToTypeAnnotationValue != null)
-        {
-            Map<Symbol, Object> map = new HashMap<Symbol, Object>();
-            map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue);
-            MessageAnnotations ma = new MessageAnnotations(map);
-            amqp.setMessageAnnotations(ma);
-        }
-
-        EncodedMessage em = encodeMessage(amqp);
-
-        javax.jms.Message jmsMessage = transformer.transform(em);
-        assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
-
-        // Verify that createDestination was called with the provided 'replyTo' address and 'Destination' class
-        Mockito.verify(mockVendor).createDestination(replyToAddress,expectedClass);
-    }
-
-    // ======= Utility Methods =========
-    // =================================
-
-    private TextMessage createMockTextMessage()
-    {
-        TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
-
-        return mockTextMessage;
-    }
-
-    private JMSVendor createMockVendor(TextMessage mockTextMessage)
-    {
-        JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
-        Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
-
-        return mockVendor;
-    }
-
-    private EncodedMessage encodeMessage(Message message)
-    {
-        byte[] encodeBuffer = new byte[1024 * 8];
-        int encodedSize;
-        while (true) {
-            try {
-                encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
-                break;
-            } catch (java.nio.BufferOverflowException e) {
-                encodeBuffer = new byte[encodeBuffer.length * 2];
-            }
-        }
-
-        long messageFormat = 0;
-        return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformerTest.java b/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformerTest.java
deleted file mode 100644
index 98dae5f..0000000
--- a/contrib/proton-jms/src/test/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformerTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import static org.junit.Assert.*;
-
-import java.util.Collections;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.message.Message;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class JMSMappingOutboundTransformerTest
-{
-    @Test
-    public void testConvertMessageWithTextMessageCreatesAmqpValueStringBody() throws Exception
-    {
-        String contentString = "myTextMessageContent";
-        TextMessage mockTextMessage = createMockTextMessage();
-        Mockito.when(mockTextMessage.getText()).thenReturn(contentString);
-        JMSVendor mockVendor = createMockVendor();
-
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
-
-        Message amqp = transformer.convert(mockTextMessage);
-
-        assertNotNull(amqp.getBody());
-        assertTrue(amqp.getBody() instanceof AmqpValue);
-        assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
-    }
-
-    // ======= JMSDestination Handling =========
-    // =========================================
-
-    @Test
-    public void testConvertMessageWithJMSDestinationNull() throws Exception
-    {
-        doTestConvertMessageWithJMSDestination(null, null);
-    }
-
-    @Test
-    public void testConvertMessageWithJMSDestinationQueue() throws Exception
-    {
-        Queue mockDest = Mockito.mock(Queue.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, "queue");
-    }
-
-    @Test
-    public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception
-    {
-        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, "temporary,queue");
-    }
-
-    @Test
-    public void testConvertMessageWithJMSDestinationTopic() throws Exception
-    {
-        Topic mockDest = Mockito.mock(Topic.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, "topic");
-    }
-
-    @Test
-    public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception
-    {
-        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
-
-        doTestConvertMessageWithJMSDestination(mockDest, "temporary,topic");
-    }
-
-    private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception
-    {
-        TextMessage mockTextMessage = createMockTextMessage();
-        Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
-        Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination);
-        JMSVendor mockVendor = createMockVendor();
-        String toAddress = "someToAddress";
-        if(jmsDestination != null)
-        {
-            Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class)))
-                    .thenReturn(toAddress);
-        }
-
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
-
-        Message amqp = transformer.convert(mockTextMessage);
-
-        MessageAnnotations ma = amqp.getMessageAnnotations();
-        Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
-        if(maMap != null)
-        {
-            Object actualValue = maMap.get(Symbol.valueOf("x-opt-to-type"));
-            assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
-        }
-        else if (expectedAnnotationValue != null)
-        {
-            fail("Expected annotation value, but there were no annotations");
-        }
-
-        if(jmsDestination != null)
-        {
-            assertEquals("Unexpected 'to' address", toAddress, amqp.getAddress());
-        }
-    }
-
-    // ======= JMSReplyTo Handling =========
-    // =====================================
-
-    @Test
-    public void testConvertMessageWithJMSReplyToNull() throws Exception
-    {
-        doTestConvertMessageWithJMSReplyTo(null, null);
-    }
-
-    @Test
-    public void testConvertMessageWithJMSReplyToQueue() throws Exception
-    {
-        Queue mockDest = Mockito.mock(Queue.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, "queue");
-    }
-
-    @Test
-    public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception
-    {
-        TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,queue");
-    }
-
-    @Test
-    public void testConvertMessageWithJMSReplyToTopic() throws Exception
-    {
-        Topic mockDest = Mockito.mock(Topic.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, "topic");
-    }
-
-    @Test
-    public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception
-    {
-        TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
-
-        doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,topic");
-    }
-
-    private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception
-    {
-        TextMessage mockTextMessage = createMockTextMessage();
-        Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
-        Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo);
-        JMSVendor mockVendor = createMockVendor();
-        String replyToAddress = "someReplyToAddress";
-        if(jmsReplyTo != null)
-        {
-            Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class)))
-                    .thenReturn(replyToAddress);
-        }
-
-        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
-
-        Message amqp = transformer.convert(mockTextMessage);
-
-        MessageAnnotations ma = amqp.getMessageAnnotations();
-        Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
-        if(maMap != null)
-        {
-            Object actualValue = maMap.get(Symbol.valueOf("x-opt-reply-type"));
-            assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
-        }
-        else if (expectedAnnotationValue != null)
-        {
-            fail("Expected annotation value, but there were no annotations");
-        }
-
-        if(jmsReplyTo != null)
-        {
-            assertEquals("Unexpected 'reply-to' address", replyToAddress, amqp.getReplyTo());
-        }
-    }
-
-    // ======= Utility Methods =========
-    // =================================
-
-    private TextMessage createMockTextMessage() throws Exception
-    {
-        TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
-        Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet()));
-
-        return mockTextMessage;
-    }
-
-    private JMSVendor createMockVendor()
-    {
-        JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
-
-        return mockVendor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7bacd41..17c39d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,8 +135,6 @@
 
   <modules>
     <module>proton-j</module>
-    <module>contrib/proton-jms</module>
-    <module>contrib/proton-hawtdispatch</module>
     <module>tests</module>
     <module>examples/engine/java</module>
     <module>examples/java/messenger</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: PROTON-1188, PROTON-1189: remove stale contrib/proton-jms and contrib/proton-hawtdispatch modules

Posted by ro...@apache.org.
PROTON-1188, PROTON-1189: remove stale contrib/proton-jms and contrib/proton-hawtdispatch modules


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/564a4d02
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/564a4d02
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/564a4d02

Branch: refs/heads/master
Commit: 564a4d024a42df7911b87a3ae7a3cdc43ac8194f
Parents: 99a9056
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue May 3 18:44:59 2016 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue May 3 18:46:48 2016 +0100

----------------------------------------------------------------------
 contrib/proton-hawtdispatch/pom.xml             |  61 --
 .../hawtdispatch/api/AmqpConnectOptions.java    | 228 --------
 .../proton/hawtdispatch/api/AmqpConnection.java | 201 -------
 .../hawtdispatch/api/AmqpDeliveryListener.java  |  32 -
 .../hawtdispatch/api/AmqpEndpointBase.java      | 158 -----
 .../qpid/proton/hawtdispatch/api/AmqpLink.java  |  27 -
 .../proton/hawtdispatch/api/AmqpReceiver.java   | 141 -----
 .../proton/hawtdispatch/api/AmqpSender.java     | 227 -------
 .../proton/hawtdispatch/api/AmqpSession.java    | 141 -----
 .../qpid/proton/hawtdispatch/api/Callback.java  |  29 -
 .../hawtdispatch/api/ChainedCallback.java       |  37 --
 .../hawtdispatch/api/DeliveryAttachment.java    |  27 -
 .../qpid/proton/hawtdispatch/api/Future.java    |  31 -
 .../hawtdispatch/api/MessageDelivery.java       | 226 -------
 .../qpid/proton/hawtdispatch/api/Promise.java   | 107 ----
 .../qpid/proton/hawtdispatch/api/QoS.java       |  26 -
 .../proton/hawtdispatch/api/TransportState.java |  29 -
 .../proton/hawtdispatch/impl/AmqpHeader.java    |  85 ---
 .../proton/hawtdispatch/impl/AmqpListener.java  |  71 ---
 .../hawtdispatch/impl/AmqpProtocolCodec.java    | 109 ----
 .../proton/hawtdispatch/impl/AmqpTransport.java | 586 -------------------
 .../qpid/proton/hawtdispatch/impl/Defer.java    |  27 -
 .../hawtdispatch/impl/EndpointContext.java      |  76 ---
 .../qpid/proton/hawtdispatch/impl/Support.java  |  41 --
 .../qpid/proton/hawtdispatch/impl/Watch.java    |  26 -
 .../proton/hawtdispatch/impl/WatchBase.java     |  54 --
 .../proton/hawtdispatch/api/SampleTest.java     | 308 ----------
 .../hawtdispatch/test/MessengerServer.java      | 151 -----
 contrib/proton-jms/pom.xml                      |  50 --
 .../jms/AMQPNativeInboundTransformer.java       |  40 --
 .../jms/AMQPNativeOutboundTransformer.java      | 111 ----
 .../proton/jms/AMQPRawInboundTransformer.java   |  47 --
 .../proton/jms/AutoOutboundTransformer.java     |  49 --
 .../apache/qpid/proton/jms/EncodedMessage.java  |  75 ---
 .../qpid/proton/jms/InboundTransformer.java     | 314 ----------
 .../jms/JMSMappingInboundTransformer.java       | 102 ----
 .../jms/JMSMappingOutboundTransformer.java      | 243 --------
 .../org/apache/qpid/proton/jms/JMSVendor.java   |  61 --
 .../qpid/proton/jms/OutboundTransformer.java    |  82 ---
 .../jms/JMSMappingInboundTransformerTest.java   | 214 -------
 .../jms/JMSMappingOutboundTransformerTest.java  | 226 -------
 pom.xml                                         |   2 -
 42 files changed, 4878 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/pom.xml b/contrib/proton-hawtdispatch/pom.xml
deleted file mode 100644
index bffcad4..0000000
--- a/contrib/proton-hawtdispatch/pom.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <parent>
-    <groupId>org.apache.qpid</groupId>
-    <artifactId>proton-project</artifactId>
-    <version>0.13.0-SNAPSHOT</version>
-    <relativePath>../..</relativePath>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>proton-hawtdispatch</artifactId>
-  <name>proton-hawtdispatch</name>
-
-  <properties>
-    <hawtbuf-version>1.9</hawtbuf-version>
-    <hawtdispatch-version>1.18</hawtdispatch-version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.qpid</groupId>
-      <artifactId>proton-j</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.fusesource.hawtdispatch</groupId>
-      <artifactId>hawtdispatch-transport</artifactId>
-      <version>${hawtdispatch-version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.fusesource.hawtbuf</groupId>
-      <artifactId>hawtbuf</artifactId>
-      <version>${hawtbuf-version}</version>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-  </build>
-  <scm>
-    <url>http://svn.apache.org/viewvc/qpid/proton/</url>
-  </scm>
-
-</project>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
deleted file mode 100644
index 3c3543d..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.transport.TcpTransport;
-
-import javax.net.ssl.SSLContext;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.*;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpConnectOptions implements Cloneable {
-
-    private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
-    private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
-    private static ThreadPoolExecutor blockingThreadPool;
-
-    public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
-        if( blockingThreadPool == null ) {
-            blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
-                    public Thread newThread(Runnable r) {
-                        Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
-                        rc.setDaemon(true);
-                        return rc;
-                    }
-                }) {
-
-                    @Override
-                    public void shutdown() {
-                        // we don't ever shutdown since we are shared..
-                    }
-
-                    @Override
-                    public List<Runnable> shutdownNow() {
-                        // we don't ever shutdown since we are shared..
-                        return Collections.emptyList();
-                    }
-                };
-        }
-        return blockingThreadPool;
-    }
-    public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
-        blockingThreadPool = pool;
-    }
-
-    private static final URI DEFAULT_HOST;
-    static {
-        try {
-            DEFAULT_HOST = new URI("tcp://localhost");
-        } catch (URISyntaxException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    URI host = DEFAULT_HOST;
-    URI localAddress;
-    SSLContext sslContext;
-    DispatchQueue dispatchQueue;
-    Executor blockingExecutor;
-    int maxReadRate;
-    int maxWriteRate;
-    int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
-    boolean useLocalHost;
-    int receiveBufferSize = 1024*64;
-    int sendBufferSize = 1024*64;
-    String localContainerId;
-    String remoteContainerId;
-    String user;
-    String password;
-
-
-    @Override
-    public AmqpConnectOptions clone() {
-        try {
-            return (AmqpConnectOptions) super.clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public String getLocalContainerId() {
-        return localContainerId;
-    }
-
-    public void setLocalContainerId(String localContainerId) {
-        this.localContainerId = localContainerId;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public String getRemoteContainerId() {
-        return remoteContainerId;
-    }
-
-    public void setRemoteContainerId(String remoteContainerId) {
-        this.remoteContainerId = remoteContainerId;
-    }
-
-    public String getUser() {
-        return user;
-    }
-
-    public void setUser(String user) {
-        this.user = user;
-    }
-
-    public Executor getBlockingExecutor() {
-        return blockingExecutor;
-    }
-
-    public void setBlockingExecutor(Executor blockingExecutor) {
-        this.blockingExecutor = blockingExecutor;
-    }
-
-    public DispatchQueue getDispatchQueue() {
-        return dispatchQueue;
-    }
-
-    public void setDispatchQueue(DispatchQueue dispatchQueue) {
-        this.dispatchQueue = dispatchQueue;
-    }
-
-    public URI getLocalAddress() {
-        return localAddress;
-    }
-
-    public void setLocalAddress(String localAddress) throws URISyntaxException {
-        this.setLocalAddress(new URI(localAddress));
-    }
-    public void setLocalAddress(URI localAddress) {
-        this.localAddress = localAddress;
-    }
-
-    public int getMaxReadRate() {
-        return maxReadRate;
-    }
-
-    public void setMaxReadRate(int maxReadRate) {
-        this.maxReadRate = maxReadRate;
-    }
-
-    public int getMaxWriteRate() {
-        return maxWriteRate;
-    }
-
-    public void setMaxWriteRate(int maxWriteRate) {
-        this.maxWriteRate = maxWriteRate;
-    }
-
-    public int getReceiveBufferSize() {
-        return receiveBufferSize;
-    }
-
-    public void setReceiveBufferSize(int receiveBufferSize) {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    public URI getHost() {
-        return host;
-    }
-    public void setHost(String host, int port) throws URISyntaxException {
-        this.setHost(new URI("tcp://"+host+":"+port));
-    }
-    public void setHost(String host) throws URISyntaxException {
-        this.setHost(new URI(host));
-    }
-    public void setHost(URI host) {
-        this.host = host;
-    }
-
-    public int getSendBufferSize() {
-        return sendBufferSize;
-    }
-
-    public void setSendBufferSize(int sendBufferSize) {
-        this.sendBufferSize = sendBufferSize;
-    }
-
-    public SSLContext getSslContext() {
-        return sslContext;
-    }
-
-    public void setSslContext(SSLContext sslContext) {
-        this.sslContext = sslContext;
-    }
-
-    public int getTrafficClass() {
-        return trafficClass;
-    }
-
-    public void setTrafficClass(int trafficClass) {
-        this.trafficClass = trafficClass;
-    }
-
-    public boolean isUseLocalHost() {
-        return useLocalHost;
-    }
-
-    public void setUseLocalHost(boolean useLocalHost) {
-        this.useLocalHost = useLocalHost;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
deleted file mode 100644
index b308209..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
-import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.ProtonJConnection;
-import org.apache.qpid.proton.engine.ProtonJSession;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpConnection extends AmqpEndpointBase  {
-
-    AmqpTransport transport;
-    ProtonJConnection connection;
-    HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
-    boolean closing = false;
-
-    public static AmqpConnection connect(AmqpConnectOptions options) {
-        return new AmqpConnection(options);
-    }
-
-    private AmqpConnection(AmqpConnectOptions options) {
-        transport = AmqpTransport.connect(options);
-        transport.setListener(new AmqpListener() {
-            @Override
-            public void processDelivery(Delivery delivery) {
-                Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
-                AmqpLink link = (AmqpLink) attachment.endpoint();
-                link.processDelivery(delivery);
-            }
-
-            @Override
-            public void processRefill() {
-                for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
-                    sender.pumpDeliveries();
-                }
-                pumpOut();
-            }
-
-            public void processTransportFailure(final IOException e) {
-            }
-        });
-        connection = transport.connection();
-        connection.open();
-        attach();
-    }
-
-    public void waitForConnected() throws Exception {
-        assertNotOnDispatchQueue();
-        getConnectedFuture().await();
-    }
-
-    public Future<Void> getConnectedFuture() {
-        final Promise<Void> rc = new Promise<Void>();
-        queue().execute(new Task() {
-            @Override
-            public void run() {
-                onConnected(rc);
-            }
-        });
-        return rc;
-    }
-
-    public void onConnected(Callback<Void> cb) {
-        transport.onTransportConnected(cb);
-    }
-
-    @Override
-    protected Endpoint getEndpoint() {
-        return connection;
-    }
-
-    @Override
-    protected AmqpConnection getConnection() {
-        return this;
-    }
-
-    @Override
-    protected AmqpEndpointBase getParent() {
-        return null;
-    }
-
-    public AmqpSession createSession() {
-        assertExecuting();
-        ProtonJSession session = connection.session();
-        session.open();
-        pumpOut();
-        return new AmqpSession(this, session);
-    }
-
-    public int getMaxSessions() {
-        return connection.getMaxChannels();
-    }
-
-    public void disconnect() {
-        closing = true;
-        transport.disconnect();
-    }
-
-    public void waitForDisconnected() throws Exception {
-        assertNotOnDispatchQueue();
-        getDisconnectedFuture().await();
-    }
-
-    public Future<Void> getDisconnectedFuture() {
-        final Promise<Void> rc = new Promise<Void>();
-        queue().execute(new Task() {
-            @Override
-            public void run() {
-                onDisconnected(rc);
-            }
-        });
-        return rc;
-    }
-
-    public void onDisconnected(Callback<Void> cb) {
-        transport.onTransportDisconnected(cb);
-    }
-
-    public TransportState getTransportState() {
-        return transport.getState();
-    }
-
-    public Throwable getTransportFailure() {
-        return transport.getFailure();
-    }
-
-    public Future<Throwable> getTransportFailureFuture() {
-        final Promise<Throwable> rc = new Promise<Throwable>();
-        queue().execute(new Task() {
-            @Override
-            public void run() {
-                onTransportFailure(rc);
-            }
-        });
-        return rc;
-    }
-
-    public void onTransportFailure(Callback<Throwable> cb) {
-        transport.onTransportFailure(cb);
-    }
-
-    @Override
-    public DispatchQueue queue() {
-        return super.queue();
-    }
-
-    public void setProtocolTracer(ProtocolTracer protocolTracer) {
-        transport.setProtocolTracer(protocolTracer);
-    }
-
-    public ProtocolTracer getProtocolTracer() {
-        return transport.getProtocolTracer();
-    }
-
-    /**
-     * Once the remote end, closes the transport is disconnected.
-     */
-    @Override
-    public void close() {
-        super.close();
-        onRemoteClose(new Callback<ErrorCondition>() {
-            @Override
-            public void onSuccess(ErrorCondition value) {
-                disconnect();
-            }
-
-            @Override
-            public void onFailure(Throwable value) {
-                disconnect();
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
deleted file mode 100644
index 1e9f4e2..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface AmqpDeliveryListener {
-
-    /**
-     * Caller should suspend/resume the AmqpReceiver to
-     * flow control the delivery of messages.
-     *
-     * @param delivery
-     */
-    void onMessageDelivery(MessageDelivery delivery);
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
deleted file mode 100644
index 4ebd8e2..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.hawtdispatch.impl.*;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Task;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract class AmqpEndpointBase extends WatchBase {
-    abstract protected Endpoint getEndpoint();
-    abstract protected AmqpEndpointBase getParent();
-
-    protected AmqpConnection getConnection() {
-        return getParent().getConnection();
-    }
-
-    protected AmqpTransport getTransport() {
-        return getConnection().transport;
-    }
-
-    protected DispatchQueue queue() {
-        return getTransport().queue();
-    }
-
-    protected void assertExecuting() {
-        getTransport().assertExecuting();
-    }
-
-    public void waitForRemoteOpen() throws Exception {
-        assertNotOnDispatchQueue();
-        getRemoteOpenFuture().await();
-    }
-
-    public Future<Void> getRemoteOpenFuture() {
-        final Promise<Void> rc = new Promise<Void>();
-        queue().execute(new Task() {
-            @Override
-            public void run() {
-                onRemoteOpen(rc);
-            }
-        });
-        return rc;
-    }
-
-    public void onRemoteOpen(final Callback<Void> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                switch (getEndpoint().getRemoteState()) {
-                    case ACTIVE:
-                        cb.onSuccess(null);
-                        return true;
-                    case CLOSED:
-                        cb.onFailure(Support.illegalState("closed"));
-                        return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    public ErrorCondition waitForRemoteClose() throws Exception {
-        assertNotOnDispatchQueue();
-        return getRemoteCloseFuture().await();
-    }
-
-    public Future<ErrorCondition> getRemoteCloseFuture() {
-        final Promise<ErrorCondition> rc = new Promise<ErrorCondition>();
-        queue().execute(new Task() {
-            @Override
-            public void run() {
-                onRemoteClose(rc);
-            }
-        });
-        return rc;
-    }
-
-    public void onRemoteClose(final Callback<ErrorCondition> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
-                    cb.onSuccess(getEndpoint().getRemoteCondition());
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    public void close() {
-        getEndpoint().close();
-        pumpOut();
-    }
-
-    public EndpointState getRemoteState() {
-        return getEndpoint().getRemoteState();
-    }
-
-    public ErrorCondition getRemoteError() {
-        return getEndpoint().getRemoteCondition();
-    }
-
-    static protected ErrorCondition toError(Throwable value) {
-        return new ErrorCondition(Symbol.valueOf("error"), value.toString());
-    }
-
-    class Attachment extends Task {
-        AmqpEndpointBase endpoint() {
-            return AmqpEndpointBase.this;
-        }
-
-        @Override
-        public void run() {
-            fireWatches();
-        }
-    }
-
-    protected void attach() {
-        getTransport().context(getEndpoint()).setAttachment(new Attachment());
-    }
-
-    protected void defer(Defer defer) {
-        getTransport().defer(defer);
-    }
-
-    protected void pumpOut() {
-        getTransport().pumpOut();
-    }
-
-    static protected void assertNotOnDispatchQueue() {
-        assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
deleted file mode 100644
index dd6f32e..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.engine.Delivery;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class AmqpLink extends AmqpEndpointBase {
-    abstract protected void processDelivery(Delivery delivery);
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
deleted file mode 100644
index 644f72a..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.hawtdispatch.impl.Defer;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
-
-import java.util.LinkedList;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpReceiver extends AmqpLink {
-
-    final AmqpSession parent;
-    final Receiver receiver;
-
-    public AmqpReceiver(AmqpSession parent, Receiver receiver2, QoS qos) {
-        this.parent = parent;
-        this.receiver = receiver2;
-        attach();
-    }
-
-    @Override
-    protected Receiver getEndpoint() {
-        return receiver;
-    }
-    @Override
-    protected AmqpSession getParent() {
-        return parent;
-    }
-
-    ByteArrayOutputStream current = new ByteArrayOutputStream();
-
-    @Override
-    protected void processDelivery(Delivery delivery) {
-        if( !delivery.isReadable() ) {
-            System.out.println("it was not readable!");
-            return;
-        }
-
-        if( current==null ) {
-            current = new ByteArrayOutputStream();
-        }
-
-        int count;
-        byte data[] = new byte[1024*4];
-        while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
-            current.write(data, 0, count);
-        }
-
-        // Expecting more deliveries..
-        if( count == 0 ) {
-            return;
-        }
-
-        receiver.advance();
-        Buffer buffer = current.toBuffer();
-        current = null;
-        onMessage(delivery, buffer);
-
-    }
-
-    LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
-
-    protected void onMessage(Delivery delivery, Buffer buffer) {
-        MessageDelivery md = new MessageDelivery(buffer) {
-            @Override
-            AmqpLink link() {
-                return AmqpReceiver.this;
-            }
-
-            @Override
-            public void settle() {
-                if( !delivery.isSettled() ) {
-                    delivery.disposition(new Accepted());
-                    delivery.settle();
-                }
-                drain();
-            }
-        };
-        md.delivery = delivery;
-        delivery.setContext(md);
-        inbound.add(md);
-        drainInbound();
-    }
-
-    public void drain() {
-        defer(deferedDrain);
-    }
-
-    Defer deferedDrain = new Defer(){
-        public void run() {
-            drainInbound();
-        }
-    };
-    int resumed = 0;
-
-    public void resume() {
-        resumed++;
-    }
-    public void suspend() {
-        resumed--;
-    }
-
-    AmqpDeliveryListener deliveryListener;
-    private void drainInbound() {
-        while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
-            deliveryListener.onMessageDelivery(inbound.removeFirst());
-            receiver.flow(1);
-        }
-    }
-
-    public AmqpDeliveryListener getDeliveryListener() {
-        return deliveryListener;
-    }
-
-    public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
-        this.deliveryListener = deliveryListener;
-        drainInbound();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
deleted file mode 100644
index 9a672d5..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.hawtdispatch.impl.Defer;
-import org.apache.qpid.proton.hawtdispatch.impl.Watch;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.fusesource.hawtbuf.Buffer;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpSender extends AmqpLink {
-
-    private  byte[] EMPTY_BYTE_ARRAY = new byte[]{};
-    long nextTagId = 0;
-    HashSet<byte[]> tagCache = new HashSet<byte[]>();
-
-    final AmqpSession parent;
-    private final QoS qos;
-    final Sender sender;
-
-    public AmqpSender(AmqpSession parent, Sender sender2, QoS qos) {
-        this.parent = parent;
-        this.sender = sender2;
-        this.qos = qos;
-        attach();
-        getConnection().senders.add(this);
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        getConnection().senders.remove(this);
-    }
-
-    @Override
-    protected Sender getEndpoint() {
-        return sender;
-    }
-
-    @Override
-    protected AmqpSession getParent() {
-        return parent;
-    }
-
-    final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
-    long outboundBufferSize;
-
-    public MessageDelivery send(Message message) {
-        assertExecuting();
-        MessageDelivery rc = new MessageDelivery(message) {
-            @Override
-            AmqpLink link() {
-                return AmqpSender.this;
-            }
-
-            @Override
-            public void redeliver(boolean incrementDeliveryCounter) {
-                super.redeliver(incrementDeliveryCounter);
-                outbound.add(this);
-                outboundBufferSize += initialSize;
-                defer(deferedPumpDeliveries);
-            }
-        };
-        outbound.add(rc);
-        outboundBufferSize += rc.initialSize;
-        pumpDeliveries();
-        pumpOut();
-        return rc;
-    }
-
-    Buffer currentBuffer;
-    Delivery currentDelivery;
-
-    Defer deferedPumpDeliveries = new Defer() {
-        public void run() {
-            pumpDeliveries();
-        }
-    };
-
-    public long getOverflowBufferSize() {
-        return outboundBufferSize;
-    }
-
-    protected void pumpDeliveries() {
-        assertExecuting();
-        try {
-            while(true) {
-                while( currentBuffer !=null ) {
-                    if( sender.getCredit() > 0 ) {
-                        int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
-                        currentBuffer.moveHead(sent);
-                        if( currentBuffer.length == 0 ) {
-                            Delivery current = currentDelivery;
-                            MessageDelivery md = (MessageDelivery) current.getContext();
-                            currentBuffer = null;
-                            currentDelivery = null;
-                            if( qos == QoS.AT_MOST_ONCE ) {
-                                current.settle();
-                            } else {
-                                sender.advance();
-                            }
-                            md.fireWatches();
-                        }
-                    } else {
-                        return;
-                    }
-                }
-
-                if( outbound.isEmpty() ) {
-                    return;
-                }
-
-                final MessageDelivery md = outbound.removeFirst();
-                outboundBufferSize -= md.initialSize;
-                currentBuffer = md.encoded();
-                if( qos == QoS.AT_MOST_ONCE ) {
-                    currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
-                } else {
-                    final byte[] tag = nextTag();
-                    currentDelivery = sender.delivery(tag, 0, tag.length);
-                }
-                md.delivery = currentDelivery;
-                currentDelivery.setContext(md);
-            }
-        } finally {
-            fireWatches();
-        }
-    }
-
-    @Override
-    protected void processDelivery(Delivery delivery) {
-        final MessageDelivery md  = (MessageDelivery) delivery.getContext();
-        if( delivery.remotelySettled() ) {
-            if( delivery.getTag().length > 0 ) {
-                checkinTag(delivery.getTag());
-            }
-
-            final DeliveryState state = delivery.getRemoteState();
-            if( state==null || state instanceof Accepted) {
-                if( !delivery.remotelySettled() ) {
-                    delivery.disposition(new Accepted());
-                }
-            } else if( state instanceof Rejected) {
-                // re-deliver /w incremented delivery counter.
-                md.delivery = null;
-                md.incrementDeliveryCount();
-                outbound.addLast(md);
-            } else if( state instanceof Released) {
-                // re-deliver && don't increment the counter.
-                md.delivery = null;
-                outbound.addLast(md);
-            } else if( state instanceof Modified) {
-                Modified modified = (Modified) state;
-                if ( modified.getDeliveryFailed() ) {
-                  // increment delivery counter..
-                  md.incrementDeliveryCount();
-                }
-            }
-            delivery.settle();
-        }
-        md.fireWatches();
-    }
-
-    byte[] nextTag() {
-        byte[] rc;
-        if (tagCache != null && !tagCache.isEmpty()) {
-            final Iterator<byte[]> iterator = tagCache.iterator();
-            rc = iterator.next();
-            iterator.remove();
-        } else {
-            try {
-                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return rc;
-    }
-
-    void checkinTag(byte[] data) {
-        if( tagCache.size() < 1024 ) {
-            tagCache.add(data);
-        }
-    }
-
-    public void onOverflowBufferDrained(final Callback<Void> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if (outboundBufferSize==0) {
-                    cb.onSuccess(null);
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
deleted file mode 100644
index b25a1b7..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.ProtonJSession;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.*;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-
-import java.util.UUID;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpSession extends AmqpEndpointBase {
-
-    final AmqpConnection parent;
-    final ProtonJSession session;
-
-
-    public AmqpSession(AmqpConnection parent, ProtonJSession session) {
-        this.parent = parent;
-        this.session = session;
-        attach();
-    }
-
-    @Override
-    protected Endpoint getEndpoint() {
-        return session;
-    }
-
-    @Override
-    protected AmqpConnection getParent() {
-        return parent;
-    }
-
-    public AmqpSender createSender(Target target) {
-        return createSender(target, QoS.AT_LEAST_ONCE);
-    }
-
-    public AmqpSender createSender(Target target, QoS qos) {
-        return createSender(target, qos, UUID.randomUUID().toString());
-    }
-
-    public AmqpSender createSender(Target target, QoS qos, String name) {
-        assertExecuting();
-        Sender sender = session.sender(name);
-        attach();
-//        Source source = new Source();
-//        source.setAddress(UUID.randomUUID().toString());
-//        sender.setSource(source);
-        sender.setTarget(target);
-        configureQos(sender, qos);
-        sender.open();
-        pumpOut();
-        return new AmqpSender(this, sender, qos);
-    }
-
-    public AmqpReceiver createReceiver(Source source) {
-        return createReceiver(source, QoS.AT_LEAST_ONCE);
-    }
-
-    public AmqpReceiver createReceiver(Source source, QoS qos) {
-        return createReceiver(source, qos, 100);
-    }
-
-    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
-        return createReceiver(source, qos, prefetch,  UUID.randomUUID().toString());
-    }
-
-    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
-        assertExecuting();
-        Receiver receiver = session.receiver(name);
-        receiver.setSource(source);
-//        Target target = new Target();
-//        target.setAddress(UUID.randomUUID().toString());
-//        receiver.setTarget(target);
-        receiver.flow(prefetch);
-        configureQos(receiver, qos);
-        receiver.open();
-        pumpOut();
-        return new AmqpReceiver(this, receiver, qos);
-    }
-
-    private void configureQos(Link link, QoS qos) {
-        switch (qos) {
-            case AT_MOST_ONCE:
-                link.setSenderSettleMode(SenderSettleMode.SETTLED);
-                link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-                break;
-            case AT_LEAST_ONCE:
-                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-                link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-                break;
-            case EXACTLY_ONCE:
-                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-                link.setReceiverSettleMode(ReceiverSettleMode.SECOND);
-                break;
-        }
-    }
-
-    public Message createTextMessage(String value) {
-        Message msg = Message.Factory.create();
-        Section body = new AmqpValue(value);
-        msg.setBody(body);
-        return msg;
-    }
-
-    public Message createBinaryMessage(byte value[]) {
-        return createBinaryMessage(value, 0, value.length);
-    }
-
-    public Message createBinaryMessage(byte value[], int offset, int len) {
-        Message msg = Message.Factory.create();
-        Data body = new Data(new Binary(value, offset,len));
-        msg.setBody(body);
-        return msg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
deleted file mode 100644
index 89fbdd1..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * <p>
- * Function Result that carries one value.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface Callback<T> {
-    public void onSuccess(T value);
-    public void onFailure(Throwable value);
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
deleted file mode 100644
index e53f512..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * <p>
- * Function Result that carries one value.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class ChainedCallback<In,Out> implements Callback<In> {
-
-    public final Callback<Out> next;
-
-    public ChainedCallback(Callback<Out> next) {
-        this.next = next;
-    }
-
-    public void onFailure(Throwable value) {
-        next.onFailure(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
deleted file mode 100644
index 290076f..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.engine.Delivery;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class DeliveryAttachment {
-    abstract void processDelivery(Delivery delivery);
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
deleted file mode 100644
index 4a9eb5e..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * <p>A simplified Future function results interface.</p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface Future<T> {
-    T await() throws Exception;
-    T await(long amount, TimeUnit unit) throws Exception;
-    void then(Callback<T> callback);
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
deleted file mode 100644
index b115557..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.hawtdispatch.impl.Watch;
-import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtdispatch.Task;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public abstract class MessageDelivery extends WatchBase {
-
-    final int initialSize;
-    private Message message;
-    private Buffer encoded;
-    public Delivery delivery;
-    private int sizeHint = 32;
-
-    static Buffer encode(Message message, int sizeHint) {
-        byte[] buffer = new byte[sizeHint];
-        int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint);
-        if( size > sizeHint ) {
-            buffer = new byte[size];
-            size = message.encode(buffer, 0, size);
-        }
-        return new Buffer(buffer, 0, size);
-    }
-
-    static Message decode(Buffer buffer) {
-        Message msg = Message.Factory.create();
-        int offset = buffer.offset;
-        int len = buffer.length;
-        while( len > 0 ) {
-            int decoded = msg.decode(buffer.data, offset, len);
-            assert decoded > 0: "Make progress decoding the message";
-            offset += decoded;
-            len -= decoded;
-        }
-        return msg;
-    }
-
-    public MessageDelivery(Message message) {
-        this(message, encode(message, 32));
-    }
-
-    public MessageDelivery(Buffer encoded) {
-        this(null, encoded);
-    }
-
-    public MessageDelivery(Message message, Buffer encoded) {
-        this.message = message;
-        this.encoded = encoded;
-        sizeHint = this.encoded.length;
-        initialSize = sizeHint;
-    }
-
-    public Message getMessage() {
-        if( message == null ) {
-            message = decode(encoded);
-        }
-        return message;
-    }
-
-    public Buffer encoded() {
-        if( encoded == null ) {
-            encoded = encode(message, sizeHint);
-            sizeHint = encoded.length;
-        }
-        return encoded;
-    }
-
-    public boolean isSettled() {
-        return delivery!=null && delivery.isSettled();
-    }
-
-    public DeliveryState getRemoteState() {
-        return delivery==null ? null : delivery.getRemoteState();
-    }
-
-    public DeliveryState getLocalState() {
-        return delivery==null ? null : delivery.getLocalState();
-    }
-
-    public void onEncoded(final Callback<Void> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if( delivery!=null ) {
-                    cb.onSuccess(null);
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    /**
-     * @return the remote delivery state when it changes.
-     * @throws Exception
-     */
-    public DeliveryState getRemoteStateChange() throws Exception {
-        AmqpEndpointBase.assertNotOnDispatchQueue();
-        return getRemoteStateChangeFuture().await();
-    }
-
-    /**
-     * @return the future remote delivery state when it changes.
-     */
-    public Future<DeliveryState> getRemoteStateChangeFuture() {
-        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
-        link().queue().execute(new Task() {
-            @Override
-            public void run() {
-                onRemoteStateChange(rc);
-            }
-        });
-        return rc;
-    }
-
-    abstract AmqpLink link();
-
-    boolean watchingRemoteStateChange;
-    public void onRemoteStateChange(final Callback<DeliveryState> cb) {
-        watchingRemoteStateChange = true;
-        final DeliveryState original = delivery.getRemoteState();
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if (original == null) {
-                    if( delivery.getRemoteState()!=null ) {
-                        cb.onSuccess(delivery.getRemoteState());
-                        watchingRemoteStateChange = false;
-                        return true;
-                    }
-                } else {
-                    if( !original.equals(delivery.getRemoteState()) ) {
-                        cb.onSuccess(delivery.getRemoteState());
-                        watchingRemoteStateChange = false;
-                        return true;
-                    }
-                }
-                return false;
-            }
-        });
-    }
-
-    /**
-     * @return the remote delivery state once settled.
-     * @throws Exception
-     */
-    public DeliveryState getSettle() throws Exception {
-        AmqpEndpointBase.assertNotOnDispatchQueue();
-        return getSettleFuture().await();
-    }
-
-    /**
-     * @return the future remote delivery state once the delivery is settled.
-     */
-    public Future<DeliveryState> getSettleFuture() {
-        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
-        link().queue().execute(new Task() {
-            @Override
-            public void run() {
-                onSettle(rc);
-            }
-        });
-        return rc;
-    }
-
-    public void onSettle(final Callback<DeliveryState> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if( delivery!=null && delivery.isSettled() ) {
-                    cb.onSuccess(delivery.getRemoteState());
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    @Override
-    protected void fireWatches() {
-        super.fireWatches();
-    }
-
-    void incrementDeliveryCount() {
-        Message msg = getMessage();
-        msg.setDeliveryCount(msg.getDeliveryCount()+1);
-        encoded = null;
-    }
-
-    public void redeliver(boolean incrementDeliveryCounter) {
-        if( incrementDeliveryCounter ) {
-            incrementDeliveryCount();
-        }
-    }
-
-    public void settle() {
-        if( !delivery.isSettled() ) {
-            delivery.settle();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
deleted file mode 100644
index b914b44..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class Promise<T> implements Callback<T>, Future<T> {
-
-    private final CountDownLatch latch = new CountDownLatch(1);
-    Callback<T> next;
-    Throwable error;
-    T value;
-
-    public void onFailure(Throwable value) {
-        Callback<T> callback = null;
-        synchronized(this)  {
-            error = value;
-            latch.countDown();
-            callback = next;
-        }
-        if( callback!=null ) {
-            callback.onFailure(value);
-        }
-    }
-
-    public void onSuccess(T value) {
-        Callback<T> callback = null;
-        synchronized(this)  {
-            this.value = value;
-            latch.countDown();
-            callback = next;
-        }
-        if( callback!=null ) {
-            callback.onSuccess(value);
-        }
-    }
-
-    public void then(Callback<T> callback) {
-        boolean fire = false;
-        synchronized(this)  {
-            next = callback;
-            if( latch.getCount() == 0 ) {
-                fire = true;
-            }
-        }
-        if( fire ) {
-            if( error!=null ) {
-                callback.onFailure(error);
-            } else {
-                callback.onSuccess(value);
-            }
-        }
-    }
-
-    public T await(long amount, TimeUnit unit) throws Exception {
-        if( latch.await(amount, unit) ) {
-            return get();
-        } else {
-            throw new TimeoutException();
-        }
-    }
-
-    public T await() throws Exception {
-        latch.await();
-        return get();
-    }
-
-    private T get() throws Exception {
-        Throwable e = error;
-        if( e !=null ) {
-            if( e instanceof RuntimeException ) {
-                throw (RuntimeException) e;
-            } else if( e instanceof Exception) {
-                throw (Exception) e;
-            } else if( e instanceof Error) {
-                throw (Error) e;
-            } else {
-                // don't expect to hit this case.
-                throw new RuntimeException(e);
-            }
-        }
-        return value;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
deleted file mode 100644
index 5b4a8dc..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public enum QoS {
-    AT_MOST_ONCE,
-    AT_LEAST_ONCE,
-    EXACTLY_ONCE
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
deleted file mode 100644
index 4ebf21a..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public enum TransportState {
-    CREATED,
-    CONNECTING,
-    CONNECTED,
-    DISCONNECTING,
-    DISCONNECTED
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
deleted file mode 100644
index de8a2cd..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtbuf.Buffer;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpHeader {
-
-    static final Buffer PREFIX = new Buffer(new byte[]{
-      'A', 'M', 'Q', 'P'
-    });
-
-    private Buffer buffer;
-
-    public AmqpHeader(){
-        this(new Buffer(new byte[]{
-          'A', 'M', 'Q', 'P', 0, 1, 0, 0
-        }));
-    }
-
-    public AmqpHeader(Buffer buffer){
-        setBuffer(buffer);
-    }
-
-    public int getProtocolId() {
-        return buffer.get(4) & 0xFF;
-    }
-    public void setProtocolId(int value) {
-        buffer.data[buffer.offset+4] = (byte) value;
-    }
-
-    public int getMajor() {
-        return buffer.get(5) & 0xFF;
-    }
-    public void setMajor(int value) {
-        buffer.data[buffer.offset+5] = (byte) value;
-    }
-
-    public int getMinor() {
-        return buffer.get(6) & 0xFF;
-    }
-    public void setMinor(int value) {
-        buffer.data[buffer.offset+6] = (byte) value;
-    }
-
-    public int getRevision() {
-        return buffer.get(7) & 0xFF;
-    }
-    public void setRevision(int value) {
-        buffer.data[buffer.offset+7] = (byte) value;
-    }
-
-    public Buffer getBuffer() {
-        return buffer;
-    }
-    public void setBuffer(Buffer value) {
-        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
-            throw new IllegalArgumentException("Not an AMQP header buffer");
-        }
-        buffer = value.buffer();
-    }
-
-
-    @Override
-    public String toString() {
-        return buffer.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
deleted file mode 100644
index f372d99..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.*;
-import org.fusesource.hawtdispatch.Task;
-
-import java.io.IOException;
-
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AmqpListener {
-
-    public Sasl processSaslConnect(ProtonJTransport protonTransport) {
-        return null;
-    }
-
-    public Sasl processSaslEvent(Sasl sasl) {
-        return sasl;
-    }
-
-    public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
-        ErrorCondition condition = endpoint.getCondition();
-        condition.setCondition(Symbol.valueOf("error"));
-        condition.setDescription("Not supported");
-        endpoint.close();
-        onComplete.run();
-    }
-
-    public void processRemoteClose(Endpoint endpoint, Task onComplete) {
-        endpoint.close();
-        onComplete.run();
-    }
-
-    public void processDelivery(Delivery delivery){
-    }
-
-    public void processTransportConnected() {
-    }
-
-    public void processTransportFailure(IOException e) {
-        this.processFailure(e);
-    }
-
-    public void processFailure(Throwable e) {
-        e.printStackTrace();
-    }
-
-    public void processRefill() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
deleted file mode 100644
index 13ed1e3..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
-
-import java.io.IOException;
-
-/**
- * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpProtocolCodec extends AbstractProtocolCodec {
-
-    int maxFrameSize = 4*1024*1024;
-
-    @Override
-    protected void encode(Object object) throws IOException {
-        nextWriteBuffer.write((Buffer) object);
-    }
-
-    @Override
-    protected Action initialDecodeAction() {
-        return new Action() {
-            public Object apply() throws IOException {
-                Buffer magic = readBytes(8);
-                if (magic != null) {
-                    nextDecodeAction = readFrameSize;
-                    return new AmqpHeader(magic);
-                } else {
-                    return null;
-                }
-            }
-        };
-    }
-
-    private final Action readFrameSize = new Action() {
-        public Object apply() throws IOException {
-            Buffer sizeBytes = peekBytes(4);
-            if (sizeBytes != null) {
-                int size = sizeBytes.bigEndianEditor().readInt();
-                if (size < 8) {
-                    throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
-                }
-                if( size > maxFrameSize ) {
-                    throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
-                }
-
-                // TODO: check frame min and max size..
-                nextDecodeAction = readFrame(size);
-                return nextDecodeAction.apply();
-            } else {
-                return null;
-            }
-        }
-    };
-
-
-    private final Action readFrame(final int size) {
-        return new Action() {
-            public Object apply() throws IOException {
-                Buffer frameData = readBytes(size);
-                if (frameData != null) {
-                    nextDecodeAction = readFrameSize;
-                    return frameData;
-                } else {
-                    return null;
-                }
-            }
-        };
-    }
-
-    public int getReadBytesPendingDecode() {
-        return readBuffer.position() - readStart;
-    }
-
-    public void skipProtocolHeader() {
-        nextDecodeAction = readFrameSize;
-    }
-
-    public void readProtocolHeader() {
-        nextDecodeAction = initialDecodeAction();
-    }
-
-    public int getMaxFrameSize() {
-        return maxFrameSize;
-    }
-
-    public void setMaxFrameSize(int maxFrameSize) {
-        this.maxFrameSize = maxFrameSize;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: PROTON-1188, PROTON-1189: remove stale contrib/proton-jms and contrib/proton-hawtdispatch modules

Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
deleted file mode 100644
index bc3ec2e..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
-import org.apache.qpid.proton.hawtdispatch.api.Callback;
-import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
-import org.apache.qpid.proton.hawtdispatch.api.TransportState;
-import org.apache.qpid.proton.engine.*;
-import org.apache.qpid.proton.engine.impl.ByteBufferUtils;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.hawtdispatch.*;
-import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
-import org.fusesource.hawtdispatch.transport.SslTransport;
-import org.fusesource.hawtdispatch.transport.TcpTransport;
-import org.fusesource.hawtdispatch.transport.Transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-
-import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
-import static org.fusesource.hawtdispatch.Dispatch.NOOP;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpTransport extends WatchBase {
-
-    private TransportState state = CREATED;
-
-    final DispatchQueue queue;
-    final ProtonJConnection connection;
-    Transport hawtdispatchTransport;
-    ProtonJTransport protonTransport;
-    Throwable failure;
-    CustomDispatchSource<Defer,LinkedList<Defer>> defers;
-
-    public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
-
-    private AmqpTransport(DispatchQueue queue) {
-        this.queue = queue;
-        this.connection = (ProtonJConnection) Connection.Factory.create();
-
-        defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
-        defers.setEventHandler(new Task(){
-            public void run() {
-                for( Defer defer: defers.getData() ) {
-                    assert defer.defered = true;
-                    defer.defered = false;
-                    defer.run();
-                }
-            }
-        });
-        defers.resume();
-    }
-
-    static public AmqpTransport connect(AmqpConnectOptions options) {
-        AmqpConnectOptions opts = options.clone();
-        if( opts.getDispatchQueue() == null ) {
-            opts.setDispatchQueue(Dispatch.createQueue());
-        }
-        if( opts.getBlockingExecutor() == null ) {
-            opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
-        }
-        return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
-    }
-
-    private AmqpTransport connecting(final AmqpConnectOptions options) {
-        assert state == CREATED;
-        try {
-            state = CONNECTING;
-            if( options.getLocalContainerId()!=null ) {
-                connection.setLocalContainerId(options.getLocalContainerId());
-            }
-            if( options.getRemoteContainerId()!=null ) {
-                connection.setContainer(options.getRemoteContainerId());
-            }
-            connection.setHostname(options.getHost().getHost());
-            Callback<Void> onConnect = new Callback<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    if( state == CONNECTED ) {
-                        hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
-                        fireWatches();
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable value) {
-                    if( state == CONNECTED || state == CONNECTING ) {
-                        failure = value;
-                        disconnect();
-                        fireWatches();
-                    }
-                }
-            };
-            if( options.getUser()!=null ) {
-                onConnect = new SaslClientHandler(options, onConnect);
-            }
-            createTransport(options, onConnect);
-        } catch (Throwable e) {
-            failure = e;
-        }
-        fireWatches();
-        return this;
-    }
-
-    public TransportState getState() {
-        return state;
-    }
-
-    /**
-     * Creates and start a transport to the AMQP server.  Passes it to the onConnect
-     * once the transport is connected.
-     *
-     * @param onConnect
-     * @throws Exception
-     */
-    void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
-        final TcpTransport transport;
-        if( options.getSslContext() !=null ) {
-            SslTransport ssl = new SslTransport();
-            ssl.setSSLContext(options.getSslContext());
-            transport = ssl;
-        } else {
-            transport = new TcpTransport();
-        }
-
-        URI host = options.getHost();
-        if( host.getPort() == -1 ) {
-            if( options.getSslContext()!=null ) {
-                host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
-            } else {
-                host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
-            }
-        }
-
-
-        transport.setBlockingExecutor(options.getBlockingExecutor());
-        transport.setDispatchQueue(options.getDispatchQueue());
-
-        transport.setMaxReadRate(options.getMaxReadRate());
-        transport.setMaxWriteRate(options.getMaxWriteRate());
-        transport.setReceiveBufferSize(options.getReceiveBufferSize());
-        transport.setSendBufferSize(options.getSendBufferSize());
-        transport.setTrafficClass(options.getTrafficClass());
-        transport.setUseLocalHost(options.isUseLocalHost());
-
-        transport.setTransportListener(new DefaultTransportListener(){
-            public void onTransportConnected() {
-                if(state==CONNECTING) {
-                    state = CONNECTED;
-                    onConnect.onSuccess(null);
-                    transport.resumeRead();
-                }
-            }
-
-            public void onTransportFailure(final IOException error) {
-                if(state==CONNECTING) {
-                    onConnect.onFailure(error);
-                }
-            }
-
-        });
-        transport.connecting(host, options.getLocalAddress());
-        bind(transport);
-        transport.start(NOOP);
-    }
-
-    class SaslClientHandler extends ChainedCallback<Void, Void> {
-
-        private final AmqpConnectOptions options;
-
-        public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
-            super(next);
-            this.options = options;
-        }
-
-        public void onSuccess(final Void value) {
-            final Sasl s = protonTransport.sasl();
-            s.client();
-            pumpOut();
-            hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
-
-                Sasl sasl = s;
-
-                @Override
-                void process() {
-                    if (sasl != null) {
-                        sasl = processSaslEvent(sasl);
-                        if (sasl == null) {
-                            // once sasl handshake is done.. we need to read the protocol header again.
-                            ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
-                        }
-                    }
-                }
-
-                @Override
-                public void onTransportFailure(IOException error) {
-                    next.onFailure(error);
-                }
-
-                @Override
-                void onFailure(Throwable error) {
-                    next.onFailure(error);
-                }
-
-                boolean authSent = false;
-
-                private Sasl processSaslEvent(Sasl sasl) {
-                    if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
-                        next.onSuccess(null);
-                        return null;
-                    }
-                    HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
-                    if (!authSent && !mechanisims.isEmpty()) {
-                        if (mechanisims.contains("PLAIN")) {
-                            authSent = true;
-                            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
-                            try {
-                                os.writeByte(0);
-                                os.write(new UTF8Buffer(options.getUser()));
-                                os.writeByte(0);
-                                if (options.getPassword() != null) {
-                                    os.write(new UTF8Buffer(options.getPassword()));
-                                }
-                            } catch (IOException e) {
-                                throw new RuntimeException(e);
-                            }
-                            Buffer buffer = os.toBuffer();
-                            sasl.setMechanisms(new String[]{"PLAIN"});
-                            sasl.send(buffer.data, buffer.offset, buffer.length);
-                        } else if (mechanisims.contains("ANONYMOUS")) {
-                            authSent = true;
-                            sasl.setMechanisms(new String[]{"ANONYMOUS"});
-                            sasl.send(new byte[0], 0, 0);
-                        } else {
-                            next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
-                            return null;
-                        }
-                    }
-                    return sasl;
-                }
-            });
-        }
-    }
-
-    class SaslServerListener extends AmqpTransportListener {
-        Sasl sasl;
-
-        @Override
-        public void onTransportCommand(Object command) {
-            try {
-                if (command.getClass() == AmqpHeader.class) {
-                    AmqpHeader header = (AmqpHeader)command;
-                    switch( header.getProtocolId() ) {
-                        case 3: // Client will be using SASL for auth..
-                            if( listener!=null ) {
-                                sasl = listener.processSaslConnect(protonTransport);
-                                break;
-                            }
-                        default:
-                            AmqpTransportListener listener = new AmqpTransportListener();
-                            hawtdispatchTransport.setTransportListener(listener);
-                            listener.onTransportCommand(command);
-                            return;
-                    }
-                    command = header.getBuffer();
-                }
-            } catch (Exception e) {
-                onFailure(e);
-            }
-            super.onTransportCommand(command);
-        }
-
-        @Override
-        void process() {
-            if (sasl != null) {
-                sasl = listener.processSaslEvent(sasl);
-            }
-            if (sasl == null) {
-                // once sasl handshake is done.. we need to read the protocol header again.
-                ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
-                hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
-            }
-        }
-    }
-
-    static public AmqpTransport accept(Transport transport) {
-        return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
-    }
-
-    private AmqpTransport accepted(final Transport transport) {
-        state = CONNECTED;
-        bind(transport);
-        hawtdispatchTransport.setTransportListener(new SaslServerListener());
-        return this;
-    }
-
-    private void bind(final Transport transport) {
-        this.hawtdispatchTransport = transport;
-        this.protonTransport = (ProtonJTransport) org.apache.qpid.proton.engine.Transport.Factory.create();
-        this.protonTransport.bind(connection);
-        if( transport.getProtocolCodec()==null ) {
-            try {
-                transport.setProtocolCodec(new AmqpProtocolCodec());
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public void defer(Defer defer) {
-        if( !defer.defered ) {
-            defer.defered = true;
-            defers.merge(defer);
-        }
-    }
-
-    public void pumpOut() {
-        assertExecuting();
-        defer(deferedPumpOut);
-    }
-
-    private Defer deferedPumpOut = new Defer() {
-        public void run() {
-            doPumpOut();
-        }
-    };
-
-    private void doPumpOut() {
-        switch(state) {
-            case CONNECTING:
-            case CONNECTED:
-                break;
-            default:
-                return;
-        }
-
-        int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
-        byte data[] = new byte[size];
-        boolean done = false;
-        int pumped = 0;
-        while( !done && !hawtdispatchTransport.full() ) {
-            int count = protonTransport.output(data, 0, size);
-            if( count > 0 ) {
-                pumped += count;
-                boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
-                assert accepted: "Should be accepted since the transport was not full";
-            } else {
-                done = true;
-            }
-        }
-        if( pumped > 0 && !hawtdispatchTransport.full() ) {
-            listener.processRefill();
-        }
-    }
-
-    public Sasl sasl;
-    public void fireListenerEvents() {
-        fireWatches();
-
-        if( sasl!=null ) {
-            sasl = listener.processSaslEvent(sasl);
-            if( sasl==null ) {
-                // once sasl handshake is done.. we need to read the protocol header again.
-                ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
-            }
-        }
-
-        context(connection).fireListenerEvents(listener);
-
-        Session session = connection.sessionHead(ALL_SET, ALL_SET);
-        while(session != null)
-        {
-            context(session).fireListenerEvents(listener);
-            session = session.next(ALL_SET, ALL_SET);
-        }
-
-        Link link = connection.linkHead(ALL_SET, ALL_SET);
-        while(link != null)
-        {
-            context(link).fireListenerEvents(listener);
-            link = link.next(ALL_SET, ALL_SET);
-        }
-
-        Delivery delivery = connection.getWorkHead();
-        while(delivery != null)
-        {
-            listener.processDelivery(delivery);
-            delivery = delivery.getWorkNext();
-        }
-
-        listener.processRefill();
-    }
-
-
-    public ProtonJConnection connection() {
-        return connection;
-    }
-
-    AmqpListener listener = new AmqpListener();
-    public AmqpListener getListener() {
-        return listener;
-    }
-
-    public void setListener(AmqpListener listener) {
-        this.listener = listener;
-    }
-
-    public EndpointContext context(Endpoint endpoint) {
-        EndpointContext context = (EndpointContext) endpoint.getContext();
-        if( context == null ) {
-            context = new EndpointContext(this, endpoint);
-            endpoint.setContext(context);
-        }
-        return context;
-    }
-
-    class AmqpTransportListener extends DefaultTransportListener {
-
-        @Override
-        public void onTransportConnected() {
-            if( listener!=null ) {
-                listener.processTransportConnected();
-            }
-        }
-
-        @Override
-        public void onRefill() {
-            if( listener!=null ) {
-                listener.processRefill();
-            }
-        }
-
-        @Override
-        public void onTransportCommand(Object command) {
-            if( state != CONNECTED ) {
-                return;
-            }
-            try {
-                Buffer buffer;
-                if (command.getClass() == AmqpHeader.class) {
-                    buffer = ((AmqpHeader) command).getBuffer();
-                } else {
-                    buffer = (Buffer) command;
-                }
-                ByteBuffer bbuffer = buffer.toByteBuffer();
-                do {
-                  ByteBuffer input = protonTransport.getInputBuffer();
-                  ByteBufferUtils.pour(bbuffer, input);
-                  protonTransport.processInput();
-                } while (bbuffer.remaining() > 0);
-                process();
-                pumpOut();
-            } catch (Exception e) {
-                onFailure(e);
-            }
-        }
-
-        void process() {
-            fireListenerEvents();
-        }
-
-        @Override
-        public void onTransportFailure(IOException error) {
-            if( state==CONNECTED ) {
-                failure = error;
-                if( listener!=null ) {
-                    listener.processTransportFailure(error);
-                    fireWatches();
-                }
-            }
-        }
-
-        void onFailure(Throwable error) {
-            failure = error;
-            if( listener!=null ) {
-                listener.processFailure(error);
-                fireWatches();
-            }
-        }
-    }
-
-    public void disconnect() {
-        assertExecuting();
-        if( state == CONNECTING || state==CONNECTED) {
-            state = DISCONNECTING;
-            if( hawtdispatchTransport!=null ) {
-                hawtdispatchTransport.stop(new Task(){
-                    public void run() {
-                        state = DISCONNECTED;
-                        hawtdispatchTransport = null;
-                        protonTransport = null;
-                        fireWatches();
-                    }
-                });
-            }
-        }
-    }
-
-    public DispatchQueue queue() {
-        return queue;
-    }
-
-    public void assertExecuting() {
-        queue().assertExecuting();
-    }
-
-    public void onTransportConnected(final Callback<Void> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if( failure !=null ) {
-                    cb.onFailure(failure);
-                    return true;
-                }
-                if( state!=CONNECTING ) {
-                    cb.onSuccess(null);
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    public void onTransportDisconnected(final Callback<Void> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if( state==DISCONNECTED ) {
-                    cb.onSuccess(null);
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    public void onTransportFailure(final Callback<Throwable> cb) {
-        addWatch(new Watch() {
-            @Override
-            public boolean execute() {
-                if( failure!=null ) {
-                    cb.onSuccess(failure);
-                    return true;
-                }
-                return false;
-            }
-        });
-    }
-
-    public Throwable getFailure() {
-        return failure;
-    }
-
-    public void setProtocolTracer(ProtocolTracer protocolTracer) {
-        protonTransport.setProtocolTracer(protocolTracer);
-    }
-
-    public ProtocolTracer getProtocolTracer() {
-        return protonTransport.getProtocolTracer();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
deleted file mode 100644
index eee8241..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtdispatch.Task;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class Defer extends Task {
-    boolean defered;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
deleted file mode 100644
index c12a849..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.fusesource.hawtdispatch.Task;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class EndpointContext {
-
-    private final AmqpTransport transport;
-    private final Endpoint endpoint;
-    private Object attachment;
-    boolean listenerProcessing;
-
-    public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
-        this.transport = transport;
-        this.endpoint = endpoint;
-    }
-
-    class ProcessedTask extends Task {
-        @Override
-        public void run() {
-            transport.assertExecuting();
-            listenerProcessing = false;
-            transport.pumpOut();
-        }
-    }
-
-    public void fireListenerEvents(AmqpListener listener) {
-        if( listener!=null && !listenerProcessing ) {
-            if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
-                endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
-                listenerProcessing = true;
-                listener.processRemoteOpen(endpoint, new ProcessedTask());
-            } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
-                endpoint.getRemoteState() == EndpointState.CLOSED ) {
-                listenerProcessing = true;
-                listener.processRemoteClose(endpoint, new ProcessedTask());
-            }
-        }
-        if( attachment !=null && attachment instanceof Task ) {
-            ((Task) attachment).run();
-        }
-    }
-
-    public Object getAttachment() {
-        return attachment;
-    }
-
-    public <T> T getAttachment(Class<T> clazz) {
-        return clazz.cast(getAttachment());
-    }
-
-    public void setAttachment(Object attachment) {
-        this.attachment = attachment;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
deleted file mode 100644
index 8d6f83b..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class Support {
-
-    public static IllegalStateException illegalState(String msg) {
-        return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
-    }
-
-    public static IllegalStateException createUnhandledEventError() {
-        return illegalState("Unhandled event.");
-    }
-
-    public static IllegalStateException createListenerNotSetError() {
-        return illegalState("No connection listener set to handle message received from the server.");
-    }
-
-    public static IllegalStateException createDisconnectedError() {
-        return illegalState("Disconnected");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
deleted file mode 100644
index 6bb7603..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public abstract class Watch {
-    /* returns true if the watch has been triggered */
-    public abstract boolean execute();
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
deleted file mode 100644
index a4b1591..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.Task;
-
-import java.util.LinkedList;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class WatchBase {
-
-    private LinkedList<Watch> watches = new LinkedList<Watch>();
-    protected void addWatch(final Watch task) {
-        watches.add(task);
-        fireWatches();
-    }
-
-    protected void fireWatches() {
-        if( !this.watches.isEmpty() ) {
-            Dispatch.getCurrentQueue().execute(new Task(){
-                @Override
-                public void run() {
-                    // Lets see if any of the watches are triggered.
-                    LinkedList<Watch> tmp = watches;
-                    watches = new LinkedList<Watch>();
-                    for (Watch task : tmp) {
-                        if( !task.execute() ) {
-                            watches.add(task);
-                        }
-                    }
-                }
-            });
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
deleted file mode 100644
index d4bc733..0000000
--- a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.net.URISyntaxException;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.hawtdispatch.test.MessengerServer;
-import org.apache.qpid.proton.message.Message;
-import org.fusesource.hawtdispatch.Task;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-/**
- * Hello world!
- *
- */
-
-public class SampleTest {
-
-	private static final Logger _logger = Logger.getLogger(SampleTest.class.getName());
-
-	private MessengerServer server;
-
-	@Before
-	public void startServer() {
-		server = new MessengerServer();
-		server.start();
-	}
-
-	@After
-	public void stopServer() {
-		server.stop();
-	}
-
-	@Test
-	public void test() throws Exception {
-		int expected = 10;
-		final AtomicInteger countdown = new AtomicInteger(expected);
-		AmqpConnectOptions options = new AmqpConnectOptions();
-		final String container = UUID.randomUUID().toString();
-		try {
-			options.setHost(server.getHost(), server.getPort());
-			options.setLocalContainerId(container);
-			options.setUser("anonymous");
-			options.setPassword("changeit");
-		} catch (URISyntaxException e) {
-			e.printStackTrace();
-		}
-		final AmqpConnection conn = AmqpConnection.connect(options );
-		_logger.fine("connection queue");
-		conn.queue().execute(new Task() {
-
-			@Override
-			public void run() {
-				_logger.fine("connection running, setup callbacks");
-				conn.onTransportFailure(new Callback<Throwable>() {
-
-					@Override
-					public void onSuccess(Throwable value) {
-						_logger.fine("transportFailure Success? " + str(value));
-						conn.close();
-					}
-
-					@Override
-					public void onFailure(Throwable value) {
-						_logger.fine("transportFailure Trouble! " + str(value));
-						conn.close();
-					}
-				});
-
-				conn.onConnected(new Callback<Void>() {
-
-					@Override
-					public void onSuccess(Void value) {
-						_logger.fine("on connect Success! in container " + container);
-						final AmqpSession session = conn.createSession();
-						Target rqtarget = new Target();
-						rqtarget.setAddress("rq-tgt");
-						final AmqpSender sender = session.createSender(rqtarget, QoS.AT_LEAST_ONCE, "request-yyy");
-						Source rqsource = new Source();
-						rqsource.setAddress("rs-src");
-						sender.getEndpoint().setSource(rqsource);
-						Source rssource = new Source();
-						rssource.setAddress("rs-src");
-						final AmqpReceiver receiver = session.createReceiver(rssource , QoS.AT_LEAST_ONCE, 10, "response-yyy");
-						Target rstarget = new Target();
-						final String address = "rs-tgt";
-						rstarget.setAddress(address);
-						receiver.getEndpoint().setTarget(rstarget);
-						sender.onRemoteClose(new Callback<ErrorCondition>() {
-
-							@Override
-							public void onSuccess(ErrorCondition value) {
-								_logger.fine("sender remote close!" + str(value));
-							}
-
-							@Override
-							public void onFailure(Throwable value) {
-								_logger.fine("sender remote close Trouble!" + str(value));
-								conn.close();
-
-							}
-
-						});
-						receiver.onRemoteClose(new Callback<ErrorCondition>() {
-
-							@Override
-							public void onSuccess(ErrorCondition value) {
-								_logger.fine("receiver remote close!" + str(value));
-							}
-
-							@Override
-							public void onFailure(Throwable value) {
-								_logger.fine("receiver remote close Trouble!" + str(value));
-								conn.close();
-
-							}
-
-						});
-
-						final Task work = new Task() {
-
-							private AtomicInteger count = new AtomicInteger();
-
-							@Override
-							public void run() {
-								Message message = session.createTextMessage("hello world! " + String.valueOf(count.incrementAndGet()));
-								message.setAddress("amqp://joze/rq-src");
-								String reply_to = "amqp://" + container + "/" + address;
-								message.setReplyTo(reply_to);
-								message.setCorrelationId("correlator");
-								final MessageDelivery md = sender.send(message);
-								md.onRemoteStateChange(new Callback<DeliveryState>() {
-
-									@Override
-									public void onSuccess(DeliveryState value) {
-										_logger.fine("delivery remote state change! " + str(value) +
-												" local: "+ str(md.getLocalState()) +
-												" remote: " + str(md.getRemoteState()));
-									}
-
-									@Override
-									public void onFailure(Throwable value) {
-										_logger.fine("remote state change Trouble!" + str(value));
-										conn.close();
-									}
-
-								});
-								md.onSettle(new Callback<DeliveryState>() {
-
-									@Override
-									public void onSuccess(DeliveryState value) {
-										_logger.fine("delivery settled! " + str(value) +
-												" local: "+ str(md.getLocalState()) +
-												" remote: " + str(md.getRemoteState()));
-										_logger.fine("sender settle mode state " +
-												" local receiver " + str(sender.getEndpoint().getReceiverSettleMode()) +
-												" local sender " + str(sender.getEndpoint().getSenderSettleMode()) +
-												" remote receiver " + str(sender.getEndpoint().getRemoteReceiverSettleMode()) +
-												" remote sender " + str(sender.getEndpoint().getRemoteSenderSettleMode()) +
-												""
-												);
-									}
-
-									@Override
-									public void onFailure(Throwable value) {
-										_logger.fine("delivery sending Trouble!" + str(value));
-										conn.close();
-									}
-								});
-							}
-
-						};
-						receiver.setDeliveryListener(new AmqpDeliveryListener() {
-
-							@Override
-							public void onMessageDelivery(
-									MessageDelivery delivery) {
-								Message message = delivery.getMessage();
-								_logger.fine("incoming message delivery! " +
-										" local " + str(delivery.getLocalState()) +
-										" remote " + str(delivery.getRemoteState()) +
-										" message " + str(message.getBody()) +
-										"");
-								delivery.onSettle(new Callback<DeliveryState>() {
-
-									@Override
-									public void onSuccess(DeliveryState value) {
-										_logger.fine("incoming message settled! ");
-										int i = countdown.decrementAndGet();
-										if ( i > 0 ) {
-											_logger.fine("More work " + str(i));
-											work.run();
-										} else {
-											conn.queue().executeAfter(100, TimeUnit.MILLISECONDS, new Task() {
-
-												@Override
-												public void run() {
-													_logger.fine("stopping sender");
-													sender.close();												
-												}
-											});
-											conn.queue().executeAfter(200, TimeUnit.MILLISECONDS, new Task() {
-
-												@Override
-												public void run() {
-													_logger.fine("stopping receiver");
-													receiver.close();
-
-												}
-											});
-											conn.queue().executeAfter(300, TimeUnit.MILLISECONDS, new Task() {
-
-												@Override
-												public void run() {
-													_logger.fine("stopping session");
-													session.close();
-
-												}
-											});
-											conn.queue().executeAfter(400, TimeUnit.MILLISECONDS, new Task() {
-
-												@Override
-												public void run() {
-													_logger.fine("stopping connection");
-													conn.close();
-
-												}
-											});
-										}
-									}
-
-									@Override
-									public void onFailure(Throwable value) {
-										_logger.fine("trouble settling incoming message " + str(value));
-										conn.close();
-									}
-								});
-								delivery.settle();
-							}
-
-						});
-
-						// start the receiver
-						receiver.resume();
-
-						// send first message
-						conn.queue().execute(work);
-					}
-
-					@Override
-					public void onFailure(Throwable value) {
-						_logger.fine("on connect Failure?" + str(value));
-						conn.close();
-					}
-				});
-				_logger.fine("connection setup done");
-
-
-			}
-
-		});
-		try {
-			_logger.fine("Waiting...");
-			Future<Void> disconnectedFuture = conn.getDisconnectedFuture();
-			disconnectedFuture.await(10, TimeUnit.SECONDS);
-			_logger.fine("done");
-			assertEquals(expected, server.getMessagesReceived());
-		} catch (Exception e) {
-			_logger.log(Level.SEVERE, "Test failed, possibly due to timeout", e);
-			throw e;
-		}
-	}
-
-	private String str(Object value) {
-		if (value == null)
-			return "null";
-		return value.toString();
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
deleted file mode 100644
index 96c772b..0000000
--- a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.test;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.qpid.proton.InterruptException;
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.Tracker;
-
-public class MessengerServer {
-	public static final String REJECT_ME = "*REJECT-ME*";
-	private int timeout = 1000;
-	private String host = "127.0.0.1";
-	private int port = 55555;
-	private Messenger msgr;
-	private AtomicInteger messagesReceived = new AtomicInteger(0);
-	private AtomicInteger messagesSent = new AtomicInteger(0);
-	private AtomicBoolean serverShouldRun = new AtomicBoolean();
-	private AtomicReference<Throwable> issues = new AtomicReference<Throwable>();
-	private Thread thread;
-	private CountDownLatch serverStart;
-
-	public MessengerServer() {
-	}
-	public void start() {
-		if (!serverShouldRun.compareAndSet(false, true)) {
-			throw new IllegalStateException("started twice");
-		}
-		msgr = Proton.messenger();
-		serverStart = new CountDownLatch(1);
-		thread = new Thread(new Runnable() {
-
-			@Override
-			public void run() {
-				try {
-					msgr.start();
-					msgr.subscribe("amqp://~"+host+":"+String.valueOf(port));
-					serverStart.countDown();
-					try {
-						while(serverShouldRun.get()) {
-							msgr.recv(100);
-							while (msgr.incoming() > 0) {
-								Message msg = msgr.get();
-								messagesReceived.incrementAndGet();
-								Tracker tracker = msgr.incomingTracker();
-								if (REJECT_ME.equals(msg.getBody())) {
-									msgr.reject(tracker , 0);
-								} else {
-									msgr.accept(tracker, 0);
-								}
-								String reply_to = msg.getReplyTo();
-								if (reply_to != null) {
-									msg.setAddress(reply_to);
-									msgr.put(msg);
-									msgr.settle(msgr.outgoingTracker(), 0);
-								}
-							}
-						}
-					} finally {
-						msgr.stop();
-					}
-				} catch (InterruptException ex) {
-					// we're done
-				} catch (Exception ex) {
-					issues.set(ex);
-				}
-			}
-
-		});
-		thread.setName("MessengerServer");
-		thread.setDaemon(true);
-		thread.start();
-		try {
-			serverStart.await();
-		} catch (InterruptedException e) {
-			msgr.interrupt();
-		}
-	}
-
-	public void stop() {
-		if (!serverShouldRun.compareAndSet(true, false)) {
-			return;
-		}
-		if (serverStart.getCount() == 0)
-			msgr.interrupt();
-		try {
-			thread.join(timeout);
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		thread = null;
-		if (!msgr.stopped())
-			msgr.stop();
-		Throwable throwable = issues.get();
-		if (throwable != null)
-			throw new RuntimeException("Messenger server had problems", throwable);
-	}
-
-	public String getHost() {
-		return host;
-	}
-
-	public int getPort() {
-		return port;
-	}
-
-	public void setHost(String host) {
-		this.host = host;
-	}
-
-	public void setPort(int port) {
-		this.port = port;
-	}
-	public int getTimeout() {
-		return timeout;
-	}
-	public void setTimeout(int timeout) {
-		this.timeout = timeout;
-	}
-
-	public int getMessagesReceived() {
-		return messagesReceived.get();
-	}
-
-	public int getMessagesSent() {
-		return messagesSent.get();
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/pom.xml b/contrib/proton-jms/pom.xml
deleted file mode 100644
index f1a51c6..0000000
--- a/contrib/proton-jms/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-  
-  http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <parent>
-    <groupId>org.apache.qpid</groupId>
-    <artifactId>proton-project</artifactId>
-    <version>0.13.0-SNAPSHOT</version>
-    <relativePath>../..</relativePath>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>proton-jms</artifactId>
-  <name>proton-jms</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.qpid</groupId>
-      <artifactId>proton-j</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-jms_1.1_spec</artifactId>
-      <version>1.1.1</version>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build> 
-  </build>
-  <scm>
-    <url>http://svn.apache.org/viewvc/qpid/proton/</url>
-  </scm>
-
-</project>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
deleted file mode 100644
index 4beb401..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.Message;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
-
-
-    public AMQPNativeInboundTransformer(JMSVendor vendor) {
-        super(vendor);
-    }
-
-    @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
-
-        Message rc = super.transform(amqpMessage);
-
-        populateMessage(rc, amqp);
-        return rc;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
deleted file mode 100644
index 7dc71d7..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.Header;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.message.ProtonJMessage;
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AMQPNativeOutboundTransformer extends OutboundTransformer {
-
-    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
-        super(vendor);
-    }
-
-    @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if( msg == null )
-            return null;
-        if( !(msg instanceof BytesMessage) )
-            return null;
-        try {
-            if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
-                return null;
-            }
-        } catch (MessageFormatException e) {
-            return null;
-        }
-        return transform(this, (BytesMessage) msg);
-    }
-
-    static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
-        long messageFormat;
-        try {
-            messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
-        } catch (MessageFormatException e) {
-            return null;
-        }
-        byte data[] = new byte[(int) msg.getBodyLength()];
-        int dataSize = data.length;
-        msg.readBytes(data);
-        msg.reset();
-
-        try {
-            int count = msg.getIntProperty("JMSXDeliveryCount");
-            if( count > 1 ) {
-
-                // decode...
-                ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
-                int offset = 0;
-                int len = data.length;
-                while( len > 0 ) {
-                    final int decoded = amqp.decode(data, offset, len);
-                    assert decoded > 0: "Make progress decoding the message";
-                    offset += decoded;
-                    len -= decoded;
-                }
-
-                // Update the DeliveryCount header...
-                // The AMQP delivery-count field only includes prior failed delivery attempts,
-                // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
-                if (amqp.getHeader() == null) {
-                    amqp.setHeader(new Header());
-                }
-
-                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
-
-                // Re-encode...
-                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
-                final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-                int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-                if( overflow.position() > 0 ) {
-                    buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
-                    c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-                }
-                data = buffer.array();
-                dataSize = c;
-            }
-        } catch (JMSException e) {
-        }
-
-        return new EncodedMessage(messageFormat, data, 0, dataSize);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
deleted file mode 100644
index 9baabdf..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-
-public class AMQPRawInboundTransformer extends InboundTransformer {
-
-    public AMQPRawInboundTransformer(JMSVendor vendor) {
-        super(vendor);
-    }
-
-    @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        BytesMessage rc = vendor.createBytesMessage();
-        rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
-
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
-        rc.setJMSPriority(defaultPriority);
-
-        final long now = System.currentTimeMillis();
-        rc.setJMSTimestamp(now);
-        if( defaultTtl > 0 ) {
-            rc.setJMSExpiration(now + defaultTtl);
-        }
-
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
-
-        return rc;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
deleted file mode 100644
index f62760b..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
-
-    private final JMSMappingOutboundTransformer transformer;
-
-    public AutoOutboundTransformer(JMSVendor vendor) {
-        super(vendor);
-        transformer = new JMSMappingOutboundTransformer(vendor);
-    }
-
-    @Override
-    public EncodedMessage transform(Message msg) throws Exception {
-        if( msg == null )
-            return null;
-        if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
-            if( msg instanceof BytesMessage ) {
-                return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
-            } else {
-                return null;
-            }
-        } else {
-            return transformer.transform(msg);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
deleted file mode 100644
index 19602c9..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.Binary;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class EncodedMessage
-{
-
-    private final Binary data;
-    final long messageFormat;
-
-    public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
-        this.data = new Binary(data, offset, length);
-        this.messageFormat = messageFormat;
-    }
-
-    public long getMessageFormat() {
-        return messageFormat;
-    }
-
-    public Message decode() throws Exception {
-        Message amqp = Message.Factory.create();
-
-        int offset = getArrayOffset();
-        int len = getLength();
-        while( len > 0 ) {
-            final int decoded = amqp.decode(getArray(), offset, len);
-            assert decoded > 0: "Make progress decoding the message";
-            offset += decoded;
-            len -= decoded;
-        }
-
-        return amqp;
-    }
-
-    public int getLength()
-    {
-        return data.getLength();
-    }
-
-    public int getArrayOffset()
-    {
-        return data.getArrayOffset();
-    }
-
-    public byte[] getArray()
-    {
-        return data.getArray();
-    }
-
-    @Override
-    public String toString()
-    {
-        return data.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
deleted file mode 100644
index 0374e6a..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.amqp.*;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-
-import javax.jms.*;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public abstract class InboundTransformer {
-
-    JMSVendor vendor;
-
-    public static final String TRANSFORMER_NATIVE = "native";
-    public static final String TRANSFORMER_RAW = "raw";
-    public static final String TRANSFORMER_JMS = "jms";
-
-    String prefixVendor = "JMS_AMQP_";
-    String prefixDeliveryAnnotations = "DA_";
-    String prefixMessageAnnotations= "MA_";
-    String prefixFooter = "FT_";
-
-    int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
-    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
-    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
-
-    public InboundTransformer(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
-    abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
-
-    public int getDefaultDeliveryMode() {
-        return defaultDeliveryMode;
-    }
-
-    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
-        this.defaultDeliveryMode = defaultDeliveryMode;
-    }
-
-    public int getDefaultPriority() {
-        return defaultPriority;
-    }
-
-    public void setDefaultPriority(int defaultPriority) {
-        this.defaultPriority = defaultPriority;
-    }
-
-    public long getDefaultTtl() {
-        return defaultTtl;
-    }
-
-    public void setDefaultTtl(long defaultTtl) {
-        this.defaultTtl = defaultTtl;
-    }
-
-    public String getPrefixVendor() {
-        return prefixVendor;
-    }
-
-    public void setPrefixVendor(String prefixVendor) {
-        this.prefixVendor = prefixVendor;
-    }
-
-    public JMSVendor getVendor() {
-        return vendor;
-    }
-
-    public void setVendor(JMSVendor vendor) {
-        this.vendor = vendor;
-    }
-
-    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
-        Header header = amqp.getHeader();
-        if( header==null ) {
-            header = new Header();
-        }
-
-        if( header.getDurable()!=null ) {
-            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-        } else {
-            jms.setJMSDeliveryMode(defaultDeliveryMode);
-        }
-        if( header.getPriority()!=null ) {
-            jms.setJMSPriority(header.getPriority().intValue());
-        } else {
-            jms.setJMSPriority(defaultPriority);
-        }
-        if( header.getFirstAcquirer() !=null ) {
-            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
-        }
-        if( header.getDeliveryCount()!=null ) {
-            vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
-        }
-
-        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
-        if( da!=null ) {
-            for (Map.Entry<?,?> entry : da.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
-            }
-        }
-
-        Class<? extends Destination> toAttributes = Destination.class;
-        Class<? extends Destination> replyToAttributes = Destination.class;
-
-        final MessageAnnotations ma = amqp.getMessageAnnotations();
-        if( ma!=null ) {
-            for (Map.Entry<?,?> entry : ma.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                if( "x-opt-jms-type".equals(key.toString()) && entry.getValue() != null ) {
-                    jms.setJMSType(entry.getValue().toString());
-                } else if( "x-opt-to-type".equals(key.toString()) ) {
-                    toAttributes = toClassFromAttributes(entry.getValue().toString());
-                } else if( "x-opt-reply-type".equals(key.toString()) ) {
-                    replyToAttributes = toClassFromAttributes(entry.getValue().toString());
-                } else {
-                    setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
-                }
-            }
-        }
-
-        final ApplicationProperties ap = amqp.getApplicationProperties();
-        if( ap !=null ) {
-            for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                if( "JMSXGroupID".equals(key) ) {
-                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
-                } else if( "JMSXGroupSequence".equals(key) ) {
-                    vendor.setJMSXGroupSequence(jms, ((Number)entry.getValue()).intValue());
-                } else if( "JMSXUserID".equals(key) ) {
-                    vendor.setJMSXUserID(jms, entry.getValue().toString());
-                } else {
-                    setProperty(jms, key, entry.getValue());
-                }
-            }
-        }
-
-        final Properties properties = amqp.getProperties();
-        if( properties!=null ) {
-            if( properties.getMessageId()!=null ) {
-                jms.setJMSMessageID(properties.getMessageId().toString());
-            }
-            Binary userId = properties.getUserId();
-            if( userId!=null ) {
-                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
-            }
-            if( properties.getTo()!=null ) {
-                jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes));
-            }
-            if( properties.getSubject()!=null ) {
-                jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
-            }
-            if( properties.getReplyTo() !=null ) {
-                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes));
-            }
-            if( properties.getCorrelationId() !=null ) {
-                jms.setJMSCorrelationID(properties.getCorrelationId().toString());
-            }
-            if( properties.getContentType() !=null ) {
-                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
-            }
-            if( properties.getContentEncoding() !=null ) {
-                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
-            }
-            if( properties.getCreationTime()!=null ) {
-                jms.setJMSTimestamp(properties.getCreationTime().getTime());
-            }
-            if( properties.getGroupId()!=null ) {
-                vendor.setJMSXGroupID(jms, properties.getGroupId());
-            }
-            if( properties.getGroupSequence()!=null ) {
-                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
-            }
-            if( properties.getReplyToGroupId()!=null ) {
-                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
-            }
-            if( properties.getAbsoluteExpiryTime()!=null ) {
-                jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
-            }
-        }
-
-        // If the jms expiration has not yet been set...
-        if( jms.getJMSExpiration()==0 ) {
-            // Then lets try to set it based on the message ttl.
-            long ttl = defaultTtl;
-            if( header.getTtl()!=null ) {
-                ttl = header.getTtl().longValue();
-            }
-            if( ttl == 0 ) {
-              jms.setJMSExpiration(0);
-            } else {
-                jms.setJMSExpiration(System.currentTimeMillis()+ttl);
-            }
-        }
-
-        final Footer fp = amqp.getFooter();
-        if( fp !=null ) {
-            for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
-            }
-        }
-    }
-
-    private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
-    private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
-    private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary");
-    private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary");
-
-    private static Set<String> createSet(String ... args) {
-        HashSet<String> s = new HashSet<String>();
-        for (String arg : args)
-        {
-            s.add(arg);
-        }
-        return Collections.unmodifiableSet(s);
-    }
-
-    Class<? extends Destination> toClassFromAttributes(String value)
-    {
-        if( value ==null ) {
-            return null;
-        }
-        HashSet<String> attributes = new HashSet<String>();
-        for( String x: value.split("\\s*,\\s*") ) {
-            attributes.add(x);
-        }
-
-        if( QUEUE_ATTRIBUTES.equals(attributes) ) {
-            return Queue.class;
-        }
-        if( TOPIC_ATTRIBUTES.equals(attributes) ) {
-            return Topic.class;
-        }
-        if( TEMP_QUEUE_ATTRIBUTES.equals(attributes) ) {
-            return TemporaryQueue.class;
-        }
-        if( TEMP_TOPIC_ATTRIBUTES.equals(attributes) ) {
-            return TemporaryTopic.class;
-        }
-        return Destination.class;
-    }
-
-
-    private void setProperty(Message msg, String key, Object value) throws JMSException {
-        if( value instanceof UnsignedLong) {
-            long v = ((UnsignedLong) value).longValue();
-            msg.setLongProperty(key, v);
-        } else if( value instanceof UnsignedInteger) {
-            long v = ((UnsignedInteger) value).longValue();
-            if( Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE ) {
-                msg.setIntProperty(key, (int) v);
-            } else {
-                msg.setLongProperty(key, v);
-            }
-        } else if( value instanceof UnsignedShort) {
-            int v = ((UnsignedShort) value).intValue();
-            if( Short.MIN_VALUE <= v && v <= Short.MAX_VALUE ) {
-                msg.setShortProperty(key, (short) v);
-            } else {
-                msg.setIntProperty(key, v);
-            }
-        } else if( value instanceof UnsignedByte) {
-            short v = ((UnsignedByte) value).shortValue();
-            if( Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE ) {
-                msg.setByteProperty(key, (byte) v);
-            } else {
-                msg.setShortProperty(key, v);
-            }
-        } else if( value instanceof Symbol) {
-            msg.setStringProperty(key, value.toString());
-        } else if( value instanceof Decimal128 ) {
-            msg.setDoubleProperty(key, ((Decimal128)value).doubleValue());
-        } else if( value instanceof Decimal64 ) {
-            msg.setDoubleProperty(key, ((Decimal64)value).doubleValue());
-        } else if( value instanceof Decimal32 ) {
-            msg.setFloatProperty(key, ((Decimal32)value).floatValue());
-        } else if( value instanceof Binary ) {
-            msg.setStringProperty(key, value.toString());
-        } else {
-            msg.setObjectProperty(key, value);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java
deleted file mode 100644
index 82c03c6..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.amqp.messaging.*;
-
-import javax.jms.*;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class JMSMappingInboundTransformer extends InboundTransformer {
-
-    public JMSMappingInboundTransformer(JMSVendor vendor) {
-        super(vendor);
-    }
-
-    @Override
-    public Message transform(EncodedMessage amqpMessage) throws Exception {
-        org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
-
-        Message rc;
-        final Section body = amqp.getBody();
-        if( body == null ) {
-            rc = vendor.createMessage();
-        } else if( body instanceof Data ) {
-            Binary d = ((Data) body).getValue();
-            BytesMessage m = vendor.createBytesMessage();
-            m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-            rc = m;
-        } else if (body instanceof AmqpSequence ) {
-            AmqpSequence sequence = (AmqpSequence) body;
-            StreamMessage m = vendor.createStreamMessage();
-            for( Object item : sequence.getValue()) {
-                m.writeObject(item);
-            }
-            rc = m;
-        } else if (body instanceof AmqpValue) {
-            Object value = ((AmqpValue) body).getValue();
-            if( value == null ) {
-                rc = vendor.createObjectMessage();
-            } if( value instanceof String ) {
-                TextMessage m = vendor.createTextMessage();
-                m.setText((String) value);
-                rc = m;
-            } else if( value instanceof Binary ) {
-                Binary d = (Binary) value;
-                BytesMessage m = vendor.createBytesMessage();
-                m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
-                rc = m;
-            } else if( value instanceof List) {
-                StreamMessage m = vendor.createStreamMessage();
-                for( Object item : (List) value) {
-                    m.writeObject(item);
-                }
-                rc = m;
-            } else if( value instanceof Map) {
-                MapMessage m = vendor.createMapMessage();
-                final Set<Map.Entry<String, Object>> set = ((Map) value).entrySet();
-                for (Map.Entry<String, Object> entry : set) {
-                    m.setObject(entry.getKey(), entry.getValue());
-                }
-                rc = m;
-            } else {
-                ObjectMessage m = vendor.createObjectMessage();
-                m.setObject((Serializable) value);
-                rc = m;
-            }
-        } else {
-            throw new RuntimeException("Unexpected body type: "+body.getClass());
-        }
-        rc.setJMSDeliveryMode(defaultDeliveryMode);
-        rc.setJMSPriority(defaultPriority);
-        rc.setJMSExpiration(defaultTtl);
-
-        populateMessage(rc, amqp);
-
-        rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
-        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
-        return rc;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org