You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2012/11/28 22:48:40 UTC

svn commit: r1414946 [2/2] - in /qpid/proton/trunk/proton-j: ./ contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/ contrib/proton-hawtdispatch/src/main/ contrib/proton-hawtdispatch/src/main/java/ contrib/proton-hawtdispatch/src/main/java/org...

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
+import org.apache.qpid.proton.hawtdispatch.api.Callback;
+import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
+import org.apache.qpid.proton.hawtdispatch.api.TransportState;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.SslTransport;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpTransport extends WatchBase {
+
+    private TransportState state = CREATED;
+
+    final DispatchQueue queue;
+    final ConnectionImpl connection = new ConnectionImpl();
+    Transport hawtdispatchTransport;
+    TransportImpl protonTransport;
+    Throwable failure;
+    CustomDispatchSource<Defer,LinkedList<Defer>> defers;
+
+    public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
+
+    private AmqpTransport(DispatchQueue queue) {
+        this.queue = queue;
+        defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
+        defers.setEventHandler(new Task(){
+            public void run() {
+                for( Defer defer: defers.getData() ) {
+                    assert defer.defered = true;
+                    defer.defered = false;
+                    defer.run();
+                }
+            }
+        });
+        defers.resume();
+    }
+
+    static public AmqpTransport connect(AmqpConnectOptions options) {
+        AmqpConnectOptions opts = options.clone();
+        if( opts.getDispatchQueue() == null ) {
+            opts.setDispatchQueue(Dispatch.createQueue());
+        }
+        if( opts.getBlockingExecutor() == null ) {
+            opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
+        }
+        return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
+    }
+
+    private AmqpTransport connecting(final AmqpConnectOptions options) {
+        assert state == CREATED;
+        try {
+            state = CONNECTING;
+            if( options.getLocalContainerId()!=null ) {
+                connection.setLocalContainerId(options.getLocalContainerId());
+            }
+            if( options.getRemoteContainerId()!=null ) {
+                connection.setContainer(options.getRemoteContainerId());
+            }
+            connection.setHostname(options.getHost().getHost());
+            Callback<Void> onConnect = new Callback<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    if( state == CONNECTED ) {
+                        hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+                        fireWatches();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable value) {
+                    if( state == CONNECTED ) {
+                        failure = value;
+                        disconnect();
+                        fireWatches();
+                    }
+                }
+            };
+            if( options.getUser()!=null ) {
+                onConnect = new SaslClientHandler(options, onConnect);
+            }
+            createTransport(options, onConnect);
+        } catch (Throwable e) {
+            failure = e;
+        }
+        fireWatches();
+        return this;
+    }
+
+    public TransportState getState() {
+        return state;
+    }
+
+    /**
+     * Creates and start a transport to the AMQP server.  Passes it to the onConnect
+     * once the transport is connected.
+     *
+     * @param onConnect
+     * @throws Exception
+     */
+    void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
+        final TcpTransport transport;
+        if( options.getSslContext() !=null ) {
+            SslTransport ssl = new SslTransport();
+            ssl.setSSLContext(options.getSslContext());
+            transport = ssl;
+        } else {
+            transport = new TcpTransport();
+        }
+
+        URI host = options.getHost();
+        if( host.getPort() == -1 ) {
+            if( options.getSslContext()!=null ) {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
+            } else {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
+            }
+        }
+
+
+        transport.setBlockingExecutor(options.getBlockingExecutor());
+        transport.setDispatchQueue(options.getDispatchQueue());
+
+        transport.setMaxReadRate(options.getMaxReadRate());
+        transport.setMaxWriteRate(options.getMaxWriteRate());
+        transport.setReceiveBufferSize(options.getReceiveBufferSize());
+        transport.setSendBufferSize(options.getSendBufferSize());
+        transport.setTrafficClass(options.getTrafficClass());
+        transport.setUseLocalHost(options.isUseLocalHost());
+        transport.connecting(host, options.getLocalAddress());
+
+        transport.setTransportListener(new DefaultTransportListener(){
+            public void onTransportConnected() {
+                if(state==CONNECTING) {
+                    state = CONNECTED;
+                    onConnect.onSuccess(null);
+                    transport.resumeRead();
+                }
+            }
+
+            public void onTransportFailure(final IOException error) {
+                if(state==CONNECTING) {
+                    onConnect.onFailure(error);
+                }
+            }
+
+        });
+        transport.connecting(host, options.getLocalAddress());
+        bind(transport);
+        transport.start(NOOP);
+    }
+
+    class SaslClientHandler extends ChainedCallback<Void, Void> {
+
+        private final AmqpConnectOptions options;
+
+        public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
+            super(next);
+            this.options = options;
+        }
+
+        public void onSuccess(final Void value) {
+            final Sasl s = protonTransport.sasl();
+            s.client();
+            pumpOut();
+            hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
+
+                Sasl sasl = s;
+
+                @Override
+                void process() {
+                    if (sasl != null) {
+                        sasl = processSaslEvent(sasl);
+                        if (sasl == null) {
+                            // once sasl handshake is done.. we need to read the protocol header again.
+                            ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                        }
+                    }
+                }
+
+                @Override
+                public void onTransportFailure(IOException error) {
+                    next.onFailure(error);
+                }
+
+                @Override
+                void onFailure(Throwable error) {
+                    next.onFailure(error);
+                }
+
+                boolean authSent = false;
+
+                private Sasl processSaslEvent(Sasl sasl) {
+                    if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
+                        next.onSuccess(null);
+                        return null;
+                    }
+                    HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
+                    if (!authSent && !mechanisims.isEmpty()) {
+                        if (!mechanisims.contains("PLAIN")) {
+                            next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
+                            return null;
+                        }
+                        authSent = true;
+                        DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+                        try {
+                            os.write(new UTF8Buffer(options.getUser()));
+                            os.writeByte(0);
+                            if (options.getPassword() != null) {
+                                os.write(new UTF8Buffer(options.getPassword()));
+                                os.writeByte(0);
+                            }
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                        Buffer buffer = os.toBuffer();
+                        sasl.setMechanisms(new String[]{"PLAIN"});
+                        sasl.send(buffer.data, buffer.offset, buffer.length);
+                    }
+                    return sasl;
+                }
+            });
+        }
+    }
+
+    class SaslServerListener extends AmqpTransportListener {
+        Sasl sasl;
+
+        @Override
+        public void onTransportCommand(Object command) {
+            try {
+                if (command.getClass() == AmqpHeader.class) {
+                    AmqpHeader header = (AmqpHeader)command;
+                    switch( header.getProtocolId() ) {
+                        case 3: // Client will be using SASL for auth..
+                            if( listener!=null ) {
+                                sasl = listener.processSaslConnect(protonTransport);
+                                break;
+                            }
+                        default:
+                            AmqpTransportListener listener = new AmqpTransportListener();
+                            hawtdispatchTransport.setTransportListener(listener);
+                            listener.onTransportCommand(command);
+                            return;
+                    }
+                    command = header.getBuffer();
+                }
+            } catch (Exception e) {
+                onFailure(e);
+            }
+            super.onTransportCommand(command);
+        }
+
+        @Override
+        void process() {
+            if (sasl != null) {
+                sasl = listener.processSaslEvent(sasl);
+            }
+            if (sasl == null) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+            }
+        }
+    }
+
+    static public AmqpTransport accept(Transport transport) {
+        return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
+    }
+
+    private AmqpTransport accepted(final Transport transport) {
+        state = CONNECTED;
+        bind(transport);
+        hawtdispatchTransport.setTransportListener(new SaslServerListener());
+        return this;
+    }
+
+    private void bind(final Transport transport) {
+        this.hawtdispatchTransport = transport;
+        this.protonTransport = new TransportImpl();
+        this.protonTransport.bind(connection);
+        if( transport.getProtocolCodec()==null ) {
+            try {
+                transport.setProtocolCodec(new AmqpProtocolCodec());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void defer(Defer defer) {
+        if( !defer.defered ) {
+            defer.defered = true;
+            defers.merge(defer);
+        }
+    }
+
+    public void pumpOut() {
+        assertExecuting();
+        defer(deferedPumpOut);
+    }
+
+    private Defer deferedPumpOut = new Defer() {
+        public void run() {
+            doPumpOut();
+        }
+    };
+
+    private void doPumpOut() {
+        switch(state) {
+            case CONNECTING:
+            case CONNECTED:
+                break;
+            default:
+                return;
+        }
+
+        int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+        byte data[] = new byte[size];
+        boolean done = false;
+        int pumped = 0;
+        while( !done && !hawtdispatchTransport.full() ) {
+            int count = protonTransport.output(data, 0, size);
+            if( count > 0 ) {
+                pumped += count;
+                boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
+                assert accepted: "Should be accepted since the transport was not full";
+            } else {
+                done = true;
+            }
+        }
+        if( pumped > 0 && !hawtdispatchTransport.full() ) {
+            listener.processRefill();
+        }
+    }
+
+    public Sasl sasl;
+    public void fireListenerEvents() {
+        fireWatches();
+
+        if( sasl!=null ) {
+            sasl = listener.processSaslEvent(sasl);
+            if( sasl==null ) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+            }
+        }
+
+        context(connection).fireListenerEvents(listener);
+
+        Session session = connection.sessionHead(ALL_SET, ALL_SET);
+        while(session != null)
+        {
+            context(session).fireListenerEvents(listener);
+            session = session.next(ALL_SET, ALL_SET);
+        }
+
+        Link link = connection.linkHead(ALL_SET, ALL_SET);
+        while(link != null)
+        {
+            context(link).fireListenerEvents(listener);
+            link = link.next(ALL_SET, ALL_SET);
+        }
+
+        Delivery delivery = connection.getWorkHead();
+        while(delivery != null)
+        {
+            listener.processDelivery(delivery);
+            delivery = delivery.getWorkNext();
+        }
+
+        listener.processRefill();
+    }
+
+
+    public ConnectionImpl connection() {
+        return connection;
+    }
+
+    AmqpListener listener = new AmqpListener();
+    public AmqpListener getListener() {
+        return listener;
+    }
+
+    public void setListener(AmqpListener listener) {
+        this.listener = listener;
+    }
+
+    public EndpointContext context(Endpoint endpoint) {
+        EndpointContext context = (EndpointContext) endpoint.getContext();
+        if( context == null ) {
+            context = new EndpointContext(this, endpoint);
+            endpoint.setContext(context);
+        }
+        return context;
+    }
+
+    class AmqpTransportListener extends DefaultTransportListener {
+
+        @Override
+        public void onTransportConnected() {
+            if( listener!=null ) {
+                listener.processTransportConnected();
+            }
+        }
+
+        @Override
+        public void onRefill() {
+            if( listener!=null ) {
+                listener.processRefill();
+            }
+        }
+
+        @Override
+        public void onTransportCommand(Object command) {
+            if( state != CONNECTED ) {
+                return;
+            }
+            try {
+                Buffer buffer;
+                if (command.getClass() == AmqpHeader.class) {
+                    buffer = ((AmqpHeader) command).getBuffer();
+                } else {
+                    buffer = (Buffer) command;
+                }
+                protonTransport.input(buffer.data, buffer.offset, buffer.length);
+                process();
+                pumpOut();
+            } catch (Exception e) {
+                onFailure(e);
+            }
+        }
+
+        void process() {
+            fireListenerEvents();
+        }
+
+        @Override
+        public void onTransportFailure(IOException error) {
+            if( state==CONNECTED ) {
+                failure = error;
+                if( listener!=null ) {
+                    listener.processTransportFailure(error);
+                    fireWatches();
+                }
+            }
+        }
+
+        void onFailure(Throwable error) {
+            failure = error;
+            if( listener!=null ) {
+                listener.processFailure(error);
+                fireWatches();
+            }
+        }
+    }
+
+    public void disconnect() {
+        assertExecuting();
+        if( state == CONNECTING || state==CONNECTED) {
+            state = DISCONNECTING;
+            if( hawtdispatchTransport!=null ) {
+                hawtdispatchTransport.stop(new Task(){
+                    public void run() {
+                        state = DISCONNECTED;
+                        hawtdispatchTransport = null;
+                        protonTransport = null;
+                        fireWatches();
+                    }
+                });
+            }
+        }
+    }
+
+    public DispatchQueue queue() {
+        return queue;
+    }
+
+    public void assertExecuting() {
+        queue().assertExecuting();
+    }
+
+    public void onTransportConnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure !=null ) {
+                    cb.onFailure(failure);
+                    return true;
+                }
+                if( state!=CONNECTING ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportDisconnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( state==DISCONNECTED ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportFailure(final Callback<Throwable> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure!=null ) {
+                    cb.onSuccess(failure);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public Throwable getFailure() {
+        return failure;
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        protonTransport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return protonTransport.getProtocolTracer();
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Defer extends Task {
+    boolean defered;
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import java.nio.ByteBuffer;
+
+public class DroppingWritableBuffer implements WritableBuffer
+{
+    int pos = 0;
+
+    @Override
+    public boolean hasRemaining() {
+        return true;
+    }
+
+    @Override
+    public void put(byte b) {
+        pos += 1;
+    }
+
+    @Override
+    public void putFloat(float f) {
+        pos += 4;
+    }
+
+    @Override
+    public void putDouble(double d) {
+        pos += 8;
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        pos += length;
+    }
+
+    @Override
+    public void putShort(short s) {
+        pos += 2;
+    }
+
+    @Override
+    public void putInt(int i) {
+        pos += 4;
+    }
+
+    @Override
+    public void putLong(long l) {
+        pos += 8;
+    }
+
+    @Override
+    public int remaining() {
+        return limit() - pos;
+    }
+
+    @Override
+    public int position() {
+        return pos;
+    }
+
+    int resetPos;
+    @Override
+    public void position(int position) {
+        resetPos = pos;
+        pos = position;
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        pos += payload.remaining();
+        payload.position(payload.limit());
+    }
+
+    public int limit() {
+        return Integer.MAX_VALUE;
+    }
+
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class EndpointContext {
+
+    private final AmqpTransport transport;
+    private final Endpoint endpoint;
+    private Object attachment;
+    boolean listenerProcessing;
+
+    public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
+        this.transport = transport;
+        this.endpoint = endpoint;
+    }
+
+    class ProcessedTask extends Task {
+        @Override
+        public void run() {
+            transport.assertExecuting();
+            listenerProcessing = false;
+            transport.pumpOut();
+        }
+    }
+
+    public void fireListenerEvents(AmqpListener listener) {
+        if( listener!=null && !listenerProcessing ) {
+            if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
+                endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
+                listenerProcessing = true;
+                listener.processRemoteOpen(endpoint, new ProcessedTask());
+            } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
+                endpoint.getRemoteState() == EndpointState.CLOSED ) {
+                listenerProcessing = true;
+                listener.processRemoteClose(endpoint, new ProcessedTask());
+            }
+        }
+        if( attachment !=null && attachment instanceof Task ) {
+            ((Task) attachment).run();
+        }
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public <T> T getAttachment(Class<T> clazz) {
+        return clazz.cast(getAttachment());
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Support {
+
+    public static IllegalStateException illegalState(String msg) {
+        return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
+    }
+
+    public static IllegalStateException createUnhandledEventError() {
+        return illegalState("Unhandled event.");
+    }
+
+    public static IllegalStateException createListenerNotSetError() {
+        return illegalState("No connection listener set to handle message received from the server.");
+    }
+
+    public static IllegalStateException createDisconnectedError() {
+        return illegalState("Disconnected");
+    }
+
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class Watch {
+    /* returns true if the watch has been triggered */
+    public abstract boolean execute();
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class WatchBase {
+
+    private LinkedList<Watch> watches = new LinkedList<Watch>();
+    protected void addWatch(final Watch task) {
+        watches.add(task);
+        fireWatches();
+    }
+
+    protected void fireWatches() {
+        if( !this.watches.isEmpty() ) {
+            Dispatch.getCurrentQueue().execute(new Task(){
+                @Override
+                public void run() {
+                    // Lets see if any of the watches are triggered.
+                    LinkedList<Watch> tmp = watches;
+                    watches = new LinkedList<Watch>();
+                    for (Watch task : tmp) {
+                        if( !task.execute() ) {
+                            watches.add(task);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+}

Modified: qpid/proton/trunk/proton-j/pom.xml
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/pom.xml?rev=1414946&r1=1414945&r2=1414946&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/pom.xml (original)
+++ qpid/proton/trunk/proton-j/pom.xml Wed Nov 28 21:48:33 2012
@@ -65,6 +65,7 @@
   <modules>
     <module>proton</module>
     <module>contrib/proton-jms</module>
+    <module>contrib/proton-hawtdispatch</module>
   </modules>
     
 </project>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org