You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2012/11/28 22:48:40 UTC
svn commit: r1414946 [2/2] - in /qpid/proton/trunk/proton-j: ./
contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/
contrib/proton-hawtdispatch/src/main/
contrib/proton-hawtdispatch/src/main/java/
contrib/proton-hawtdispatch/src/main/java/org...
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
+import org.apache.qpid.proton.hawtdispatch.api.Callback;
+import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
+import org.apache.qpid.proton.hawtdispatch.api.TransportState;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.SslTransport;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpTransport extends WatchBase {
+
+ private TransportState state = CREATED;
+
+ final DispatchQueue queue;
+ final ConnectionImpl connection = new ConnectionImpl();
+ Transport hawtdispatchTransport;
+ TransportImpl protonTransport;
+ Throwable failure;
+ CustomDispatchSource<Defer,LinkedList<Defer>> defers;
+
+ public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
+
+ private AmqpTransport(DispatchQueue queue) {
+ this.queue = queue;
+ defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
+ defers.setEventHandler(new Task(){
+ public void run() {
+ for( Defer defer: defers.getData() ) {
+ assert defer.defered = true;
+ defer.defered = false;
+ defer.run();
+ }
+ }
+ });
+ defers.resume();
+ }
+
+ static public AmqpTransport connect(AmqpConnectOptions options) {
+ AmqpConnectOptions opts = options.clone();
+ if( opts.getDispatchQueue() == null ) {
+ opts.setDispatchQueue(Dispatch.createQueue());
+ }
+ if( opts.getBlockingExecutor() == null ) {
+ opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
+ }
+ return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
+ }
+
+ private AmqpTransport connecting(final AmqpConnectOptions options) {
+ assert state == CREATED;
+ try {
+ state = CONNECTING;
+ if( options.getLocalContainerId()!=null ) {
+ connection.setLocalContainerId(options.getLocalContainerId());
+ }
+ if( options.getRemoteContainerId()!=null ) {
+ connection.setContainer(options.getRemoteContainerId());
+ }
+ connection.setHostname(options.getHost().getHost());
+ Callback<Void> onConnect = new Callback<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ if( state == CONNECTED ) {
+ hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+ fireWatches();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ if( state == CONNECTED ) {
+ failure = value;
+ disconnect();
+ fireWatches();
+ }
+ }
+ };
+ if( options.getUser()!=null ) {
+ onConnect = new SaslClientHandler(options, onConnect);
+ }
+ createTransport(options, onConnect);
+ } catch (Throwable e) {
+ failure = e;
+ }
+ fireWatches();
+ return this;
+ }
+
+ public TransportState getState() {
+ return state;
+ }
+
+ /**
+ * Creates and start a transport to the AMQP server. Passes it to the onConnect
+ * once the transport is connected.
+ *
+ * @param onConnect
+ * @throws Exception
+ */
+ void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
+ final TcpTransport transport;
+ if( options.getSslContext() !=null ) {
+ SslTransport ssl = new SslTransport();
+ ssl.setSSLContext(options.getSslContext());
+ transport = ssl;
+ } else {
+ transport = new TcpTransport();
+ }
+
+ URI host = options.getHost();
+ if( host.getPort() == -1 ) {
+ if( options.getSslContext()!=null ) {
+ host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
+ } else {
+ host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
+ }
+ }
+
+
+ transport.setBlockingExecutor(options.getBlockingExecutor());
+ transport.setDispatchQueue(options.getDispatchQueue());
+
+ transport.setMaxReadRate(options.getMaxReadRate());
+ transport.setMaxWriteRate(options.getMaxWriteRate());
+ transport.setReceiveBufferSize(options.getReceiveBufferSize());
+ transport.setSendBufferSize(options.getSendBufferSize());
+ transport.setTrafficClass(options.getTrafficClass());
+ transport.setUseLocalHost(options.isUseLocalHost());
+ transport.connecting(host, options.getLocalAddress());
+
+ transport.setTransportListener(new DefaultTransportListener(){
+ public void onTransportConnected() {
+ if(state==CONNECTING) {
+ state = CONNECTED;
+ onConnect.onSuccess(null);
+ transport.resumeRead();
+ }
+ }
+
+ public void onTransportFailure(final IOException error) {
+ if(state==CONNECTING) {
+ onConnect.onFailure(error);
+ }
+ }
+
+ });
+ transport.connecting(host, options.getLocalAddress());
+ bind(transport);
+ transport.start(NOOP);
+ }
+
+ class SaslClientHandler extends ChainedCallback<Void, Void> {
+
+ private final AmqpConnectOptions options;
+
+ public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
+ super(next);
+ this.options = options;
+ }
+
+ public void onSuccess(final Void value) {
+ final Sasl s = protonTransport.sasl();
+ s.client();
+ pumpOut();
+ hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
+
+ Sasl sasl = s;
+
+ @Override
+ void process() {
+ if (sasl != null) {
+ sasl = processSaslEvent(sasl);
+ if (sasl == null) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ }
+ }
+ }
+
+ @Override
+ public void onTransportFailure(IOException error) {
+ next.onFailure(error);
+ }
+
+ @Override
+ void onFailure(Throwable error) {
+ next.onFailure(error);
+ }
+
+ boolean authSent = false;
+
+ private Sasl processSaslEvent(Sasl sasl) {
+ if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
+ next.onSuccess(null);
+ return null;
+ }
+ HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
+ if (!authSent && !mechanisims.isEmpty()) {
+ if (!mechanisims.contains("PLAIN")) {
+ next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
+ return null;
+ }
+ authSent = true;
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+ try {
+ os.write(new UTF8Buffer(options.getUser()));
+ os.writeByte(0);
+ if (options.getPassword() != null) {
+ os.write(new UTF8Buffer(options.getPassword()));
+ os.writeByte(0);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Buffer buffer = os.toBuffer();
+ sasl.setMechanisms(new String[]{"PLAIN"});
+ sasl.send(buffer.data, buffer.offset, buffer.length);
+ }
+ return sasl;
+ }
+ });
+ }
+ }
+
+ class SaslServerListener extends AmqpTransportListener {
+ Sasl sasl;
+
+ @Override
+ public void onTransportCommand(Object command) {
+ try {
+ if (command.getClass() == AmqpHeader.class) {
+ AmqpHeader header = (AmqpHeader)command;
+ switch( header.getProtocolId() ) {
+ case 3: // Client will be using SASL for auth..
+ if( listener!=null ) {
+ sasl = listener.processSaslConnect(protonTransport);
+ break;
+ }
+ default:
+ AmqpTransportListener listener = new AmqpTransportListener();
+ hawtdispatchTransport.setTransportListener(listener);
+ listener.onTransportCommand(command);
+ return;
+ }
+ command = header.getBuffer();
+ }
+ } catch (Exception e) {
+ onFailure(e);
+ }
+ super.onTransportCommand(command);
+ }
+
+ @Override
+ void process() {
+ if (sasl != null) {
+ sasl = listener.processSaslEvent(sasl);
+ }
+ if (sasl == null) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+ }
+ }
+ }
+
+ static public AmqpTransport accept(Transport transport) {
+ return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
+ }
+
+ private AmqpTransport accepted(final Transport transport) {
+ state = CONNECTED;
+ bind(transport);
+ hawtdispatchTransport.setTransportListener(new SaslServerListener());
+ return this;
+ }
+
+ private void bind(final Transport transport) {
+ this.hawtdispatchTransport = transport;
+ this.protonTransport = new TransportImpl();
+ this.protonTransport.bind(connection);
+ if( transport.getProtocolCodec()==null ) {
+ try {
+ transport.setProtocolCodec(new AmqpProtocolCodec());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void defer(Defer defer) {
+ if( !defer.defered ) {
+ defer.defered = true;
+ defers.merge(defer);
+ }
+ }
+
+ public void pumpOut() {
+ assertExecuting();
+ defer(deferedPumpOut);
+ }
+
+ private Defer deferedPumpOut = new Defer() {
+ public void run() {
+ doPumpOut();
+ }
+ };
+
+ private void doPumpOut() {
+ switch(state) {
+ case CONNECTING:
+ case CONNECTED:
+ break;
+ default:
+ return;
+ }
+
+ int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+ byte data[] = new byte[size];
+ boolean done = false;
+ int pumped = 0;
+ while( !done && !hawtdispatchTransport.full() ) {
+ int count = protonTransport.output(data, 0, size);
+ if( count > 0 ) {
+ pumped += count;
+ boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
+ assert accepted: "Should be accepted since the transport was not full";
+ } else {
+ done = true;
+ }
+ }
+ if( pumped > 0 && !hawtdispatchTransport.full() ) {
+ listener.processRefill();
+ }
+ }
+
+ public Sasl sasl;
+ public void fireListenerEvents() {
+ fireWatches();
+
+ if( sasl!=null ) {
+ sasl = listener.processSaslEvent(sasl);
+ if( sasl==null ) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ }
+ }
+
+ context(connection).fireListenerEvents(listener);
+
+ Session session = connection.sessionHead(ALL_SET, ALL_SET);
+ while(session != null)
+ {
+ context(session).fireListenerEvents(listener);
+ session = session.next(ALL_SET, ALL_SET);
+ }
+
+ Link link = connection.linkHead(ALL_SET, ALL_SET);
+ while(link != null)
+ {
+ context(link).fireListenerEvents(listener);
+ link = link.next(ALL_SET, ALL_SET);
+ }
+
+ Delivery delivery = connection.getWorkHead();
+ while(delivery != null)
+ {
+ listener.processDelivery(delivery);
+ delivery = delivery.getWorkNext();
+ }
+
+ listener.processRefill();
+ }
+
+
+ public ConnectionImpl connection() {
+ return connection;
+ }
+
+ AmqpListener listener = new AmqpListener();
+ public AmqpListener getListener() {
+ return listener;
+ }
+
+ public void setListener(AmqpListener listener) {
+ this.listener = listener;
+ }
+
+ public EndpointContext context(Endpoint endpoint) {
+ EndpointContext context = (EndpointContext) endpoint.getContext();
+ if( context == null ) {
+ context = new EndpointContext(this, endpoint);
+ endpoint.setContext(context);
+ }
+ return context;
+ }
+
+ class AmqpTransportListener extends DefaultTransportListener {
+
+ @Override
+ public void onTransportConnected() {
+ if( listener!=null ) {
+ listener.processTransportConnected();
+ }
+ }
+
+ @Override
+ public void onRefill() {
+ if( listener!=null ) {
+ listener.processRefill();
+ }
+ }
+
+ @Override
+ public void onTransportCommand(Object command) {
+ if( state != CONNECTED ) {
+ return;
+ }
+ try {
+ Buffer buffer;
+ if (command.getClass() == AmqpHeader.class) {
+ buffer = ((AmqpHeader) command).getBuffer();
+ } else {
+ buffer = (Buffer) command;
+ }
+ protonTransport.input(buffer.data, buffer.offset, buffer.length);
+ process();
+ pumpOut();
+ } catch (Exception e) {
+ onFailure(e);
+ }
+ }
+
+ void process() {
+ fireListenerEvents();
+ }
+
+ @Override
+ public void onTransportFailure(IOException error) {
+ if( state==CONNECTED ) {
+ failure = error;
+ if( listener!=null ) {
+ listener.processTransportFailure(error);
+ fireWatches();
+ }
+ }
+ }
+
+ void onFailure(Throwable error) {
+ failure = error;
+ if( listener!=null ) {
+ listener.processFailure(error);
+ fireWatches();
+ }
+ }
+ }
+
+ public void disconnect() {
+ assertExecuting();
+ if( state == CONNECTING || state==CONNECTED) {
+ state = DISCONNECTING;
+ if( hawtdispatchTransport!=null ) {
+ hawtdispatchTransport.stop(new Task(){
+ public void run() {
+ state = DISCONNECTED;
+ hawtdispatchTransport = null;
+ protonTransport = null;
+ fireWatches();
+ }
+ });
+ }
+ }
+ }
+
+ public DispatchQueue queue() {
+ return queue;
+ }
+
+ public void assertExecuting() {
+ queue().assertExecuting();
+ }
+
+ public void onTransportConnected(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( failure !=null ) {
+ cb.onFailure(failure);
+ return true;
+ }
+ if( state!=CONNECTING ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void onTransportDisconnected(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( state==DISCONNECTED ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void onTransportFailure(final Callback<Throwable> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( failure!=null ) {
+ cb.onSuccess(failure);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public Throwable getFailure() {
+ return failure;
+ }
+
+ public void setProtocolTracer(ProtocolTracer protocolTracer) {
+ protonTransport.setProtocolTracer(protocolTracer);
+ }
+
+ public ProtocolTracer getProtocolTracer() {
+ return protonTransport.getProtocolTracer();
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Defer extends Task {
+ boolean defered;
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import java.nio.ByteBuffer;
+
+public class DroppingWritableBuffer implements WritableBuffer
+{
+ int pos = 0;
+
+ @Override
+ public boolean hasRemaining() {
+ return true;
+ }
+
+ @Override
+ public void put(byte b) {
+ pos += 1;
+ }
+
+ @Override
+ public void putFloat(float f) {
+ pos += 4;
+ }
+
+ @Override
+ public void putDouble(double d) {
+ pos += 8;
+ }
+
+ @Override
+ public void put(byte[] src, int offset, int length) {
+ pos += length;
+ }
+
+ @Override
+ public void putShort(short s) {
+ pos += 2;
+ }
+
+ @Override
+ public void putInt(int i) {
+ pos += 4;
+ }
+
+ @Override
+ public void putLong(long l) {
+ pos += 8;
+ }
+
+ @Override
+ public int remaining() {
+ return limit() - pos;
+ }
+
+ @Override
+ public int position() {
+ return pos;
+ }
+
+ int resetPos;
+ @Override
+ public void position(int position) {
+ resetPos = pos;
+ pos = position;
+ }
+
+ @Override
+ public void put(ByteBuffer payload) {
+ pos += payload.remaining();
+ payload.position(payload.limit());
+ }
+
+ public int limit() {
+ return Integer.MAX_VALUE;
+ }
+
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class EndpointContext {
+
+ private final AmqpTransport transport;
+ private final Endpoint endpoint;
+ private Object attachment;
+ boolean listenerProcessing;
+
+ public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
+ this.transport = transport;
+ this.endpoint = endpoint;
+ }
+
+ class ProcessedTask extends Task {
+ @Override
+ public void run() {
+ transport.assertExecuting();
+ listenerProcessing = false;
+ transport.pumpOut();
+ }
+ }
+
+ public void fireListenerEvents(AmqpListener listener) {
+ if( listener!=null && !listenerProcessing ) {
+ if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
+ endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
+ listenerProcessing = true;
+ listener.processRemoteOpen(endpoint, new ProcessedTask());
+ } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
+ endpoint.getRemoteState() == EndpointState.CLOSED ) {
+ listenerProcessing = true;
+ listener.processRemoteClose(endpoint, new ProcessedTask());
+ }
+ }
+ if( attachment !=null && attachment instanceof Task ) {
+ ((Task) attachment).run();
+ }
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ public <T> T getAttachment(Class<T> clazz) {
+ return clazz.cast(getAttachment());
+ }
+
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Support {
+
+ public static IllegalStateException illegalState(String msg) {
+ return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
+ }
+
+ public static IllegalStateException createUnhandledEventError() {
+ return illegalState("Unhandled event.");
+ }
+
+ public static IllegalStateException createListenerNotSetError() {
+ return illegalState("No connection listener set to handle message received from the server.");
+ }
+
+ public static IllegalStateException createDisconnectedError() {
+ return illegalState("Disconnected");
+ }
+
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class Watch {
+ /* returns true if the watch has been triggered */
+ public abstract boolean execute();
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class WatchBase {
+
+ private LinkedList<Watch> watches = new LinkedList<Watch>();
+ protected void addWatch(final Watch task) {
+ watches.add(task);
+ fireWatches();
+ }
+
+ protected void fireWatches() {
+ if( !this.watches.isEmpty() ) {
+ Dispatch.getCurrentQueue().execute(new Task(){
+ @Override
+ public void run() {
+ // Lets see if any of the watches are triggered.
+ LinkedList<Watch> tmp = watches;
+ watches = new LinkedList<Watch>();
+ for (Watch task : tmp) {
+ if( !task.execute() ) {
+ watches.add(task);
+ }
+ }
+ }
+ });
+ }
+ }
+
+}
Modified: qpid/proton/trunk/proton-j/pom.xml
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/pom.xml?rev=1414946&r1=1414945&r2=1414946&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/pom.xml (original)
+++ qpid/proton/trunk/proton-j/pom.xml Wed Nov 28 21:48:33 2012
@@ -65,6 +65,7 @@
<modules>
<module>proton</module>
<module>contrib/proton-jms</module>
+ <module>contrib/proton-hawtdispatch</module>
</modules>
</project>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org