You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC
svn commit: r1622849 [2/9] - in
/qpid/proton/branches/fadams-javascript-binding: ./ contrib/
contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/
contrib/proton-hawtdispatch/src/main/
contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+ public final Callback<Out> next;
+
+ public ChainedCallback(Callback<Out> next) {
+ this.next = next;
+ }
+
+ public void onFailure(Throwable value) {
+ next.onFailure(value);
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+ abstract void processDelivery(Delivery delivery);
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+ T await() throws Exception;
+ T await(long amount, TimeUnit unit) throws Exception;
+ void then(Callback<T> callback);
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+ final int initialSize;
+ private Message message;
+ private Buffer encoded;
+ public Delivery delivery;
+ private int sizeHint = 32;
+
+ static Buffer encode(Message message, int sizeHint) {
+ byte[] buffer = new byte[sizeHint];
+ int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint);
+ if( size > sizeHint ) {
+ buffer = new byte[size];
+ size = message.encode(buffer, 0, size);
+ }
+ return new Buffer(buffer, 0, size);
+ }
+
+ static Message decode(Buffer buffer) {
+ Message msg = Message.Factory.create();
+ int offset = buffer.offset;
+ int len = buffer.length;
+ while( len > 0 ) {
+ int decoded = msg.decode(buffer.data, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+ return msg;
+ }
+
+ public MessageDelivery(Message message) {
+ this(message, encode(message, 32));
+ }
+
+ public MessageDelivery(Buffer encoded) {
+ this(null, encoded);
+ }
+
+ public MessageDelivery(Message message, Buffer encoded) {
+ this.message = message;
+ this.encoded = encoded;
+ sizeHint = this.encoded.length;
+ initialSize = sizeHint;
+ }
+
+ public Message getMessage() {
+ if( message == null ) {
+ message = decode(encoded);
+ }
+ return message;
+ }
+
+ public Buffer encoded() {
+ if( encoded == null ) {
+ encoded = encode(message, sizeHint);
+ sizeHint = encoded.length;
+ }
+ return encoded;
+ }
+
+ public boolean isSettled() {
+ return delivery!=null && delivery.isSettled();
+ }
+
+ public DeliveryState getRemoteState() {
+ return delivery==null ? null : delivery.getRemoteState();
+ }
+
+ public DeliveryState getLocalState() {
+ return delivery==null ? null : delivery.getLocalState();
+ }
+
+ public void onEncoded(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state when it changes.
+ * @throws Exception
+ */
+ public DeliveryState getRemoteStateChange() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getRemoteStateChangeFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state when it changes.
+ */
+ public Future<DeliveryState> getRemoteStateChangeFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteStateChange(rc);
+ }
+ });
+ return rc;
+ }
+
+ abstract AmqpLink link();
+
+ boolean watchingRemoteStateChange;
+ public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+ watchingRemoteStateChange = true;
+ final DeliveryState original = delivery.getRemoteState();
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (original == null) {
+ if( delivery.getRemoteState()!=null ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ } else {
+ if( !original.equals(delivery.getRemoteState()) ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state once settled.
+ * @throws Exception
+ */
+ public DeliveryState getSettle() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getSettleFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state once the delivery is settled.
+ */
+ public Future<DeliveryState> getSettleFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onSettle(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onSettle(final Callback<DeliveryState> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null && delivery.isSettled() ) {
+ cb.onSuccess(delivery.getRemoteState());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ @Override
+ protected void fireWatches() {
+ super.fireWatches();
+ }
+
+ void incrementDeliveryCount() {
+ Message msg = getMessage();
+ msg.setDeliveryCount(msg.getDeliveryCount()+1);
+ encoded = null;
+ }
+
+ public void redeliver(boolean incrementDeliveryCounter) {
+ if( incrementDeliveryCounter ) {
+ incrementDeliveryCount();
+ }
+ }
+
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.settle();
+ }
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+ Callback<T> next;
+ Throwable error;
+ T value;
+
+ public void onFailure(Throwable value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ error = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onFailure(value);
+ }
+ }
+
+ public void onSuccess(T value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ this.value = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onSuccess(value);
+ }
+ }
+
+ public void then(Callback<T> callback) {
+ boolean fire = false;
+ synchronized(this) {
+ next = callback;
+ if( latch.getCount() == 0 ) {
+ fire = true;
+ }
+ }
+ if( fire ) {
+ if( error!=null ) {
+ callback.onFailure(error);
+ } else {
+ callback.onSuccess(value);
+ }
+ }
+ }
+
+ public T await(long amount, TimeUnit unit) throws Exception {
+ if( latch.await(amount, unit) ) {
+ return get();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ public T await() throws Exception {
+ latch.await();
+ return get();
+ }
+
+ private T get() throws Exception {
+ Throwable e = error;
+ if( e !=null ) {
+ if( e instanceof RuntimeException ) {
+ throw (RuntimeException) e;
+ } else if( e instanceof Exception) {
+ throw (Exception) e;
+ } else if( e instanceof Error) {
+ throw (Error) e;
+ } else {
+ // don't expect to hit this case.
+ throw new RuntimeException(e);
+ }
+ }
+ return value;
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+ AT_MOST_ONCE,
+ AT_LEAST_ONCE,
+ EXACTLY_ONCE
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum TransportState {
+ CREATED,
+ CONNECTING,
+ CONNECTED,
+ DISCONNECTING,
+ DISCONNECTED
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+ static final Buffer PREFIX = new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P'
+ });
+
+ private Buffer buffer;
+
+ public AmqpHeader(){
+ this(new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P', 0, 1, 0, 0
+ }));
+ }
+
+ public AmqpHeader(Buffer buffer){
+ setBuffer(buffer);
+ }
+
+ public int getProtocolId() {
+ return buffer.get(4) & 0xFF;
+ }
+ public void setProtocolId(int value) {
+ buffer.data[buffer.offset+4] = (byte) value;
+ }
+
+ public int getMajor() {
+ return buffer.get(5) & 0xFF;
+ }
+ public void setMajor(int value) {
+ buffer.data[buffer.offset+5] = (byte) value;
+ }
+
+ public int getMinor() {
+ return buffer.get(6) & 0xFF;
+ }
+ public void setMinor(int value) {
+ buffer.data[buffer.offset+6] = (byte) value;
+ }
+
+ public int getRevision() {
+ return buffer.get(7) & 0xFF;
+ }
+ public void setRevision(int value) {
+ buffer.data[buffer.offset+7] = (byte) value;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+ public void setBuffer(Buffer value) {
+ if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+ throw new IllegalArgumentException("Not an AMQP header buffer");
+ }
+ buffer = value.buffer();
+ }
+
+
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.*;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+ public Sasl processSaslConnect(ProtonJTransport protonTransport) {
+ return null;
+ }
+
+ public Sasl processSaslEvent(Sasl sasl) {
+ return sasl;
+ }
+
+ public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
+ ErrorCondition condition = endpoint.getCondition();
+ condition.setCondition(Symbol.valueOf("error"));
+ condition.setDescription("Not supported");
+ endpoint.close();
+ onComplete.run();
+ }
+
+ public void processRemoteClose(Endpoint endpoint, Task onComplete) {
+ endpoint.close();
+ onComplete.run();
+ }
+
+ public void processDelivery(Delivery delivery){
+ }
+
+ public void processTransportConnected() {
+ }
+
+ public void processTransportFailure(IOException e) {
+ this.processFailure(e);
+ }
+
+ public void processFailure(Throwable e) {
+ e.printStackTrace();
+ }
+
+ public void processRefill() {
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+ int maxFrameSize = 4*1024*1024;
+
+ @Override
+ protected void encode(Object object) throws IOException {
+ nextWriteBuffer.write((Buffer) object);
+ }
+
+ @Override
+ protected Action initialDecodeAction() {
+ return new Action() {
+ public Object apply() throws IOException {
+ Buffer magic = readBytes(8);
+ if (magic != null) {
+ nextDecodeAction = readFrameSize;
+ return new AmqpHeader(magic);
+ } else {
+ return null;
+ }
+ }
+ };
+ }
+
+ private final Action readFrameSize = new Action() {
+ public Object apply() throws IOException {
+ Buffer sizeBytes = peekBytes(4);
+ if (sizeBytes != null) {
+ int size = sizeBytes.bigEndianEditor().readInt();
+ if (size < 8) {
+ throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
+ }
+ if( size > maxFrameSize ) {
+ throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
+ }
+
+ // TODO: check frame min and max size..
+ nextDecodeAction = readFrame(size);
+ return nextDecodeAction.apply();
+ } else {
+ return null;
+ }
+ }
+ };
+
+
+ private final Action readFrame(final int size) {
+ return new Action() {
+ public Object apply() throws IOException {
+ Buffer frameData = readBytes(size);
+ if (frameData != null) {
+ nextDecodeAction = readFrameSize;
+ return frameData;
+ } else {
+ return null;
+ }
+ }
+ };
+ }
+
+ public int getReadBytesPendingDecode() {
+ return readBuffer.position() - readStart;
+ }
+
+ public void skipProtocolHeader() {
+ nextDecodeAction = readFrameSize;
+ }
+
+ public void readProtocolHeader() {
+ nextDecodeAction = initialDecodeAction();
+ }
+
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
+import org.apache.qpid.proton.hawtdispatch.api.Callback;
+import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
+import org.apache.qpid.proton.hawtdispatch.api.TransportState;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ByteBufferUtils;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.SslTransport;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpTransport extends WatchBase {
+
+ private TransportState state = CREATED;
+
+ final DispatchQueue queue;
+ final ProtonJConnection connection;
+ Transport hawtdispatchTransport;
+ ProtonJTransport protonTransport;
+ Throwable failure;
+ CustomDispatchSource<Defer,LinkedList<Defer>> defers;
+
+ public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
+
+ private AmqpTransport(DispatchQueue queue) {
+ this.queue = queue;
+ this.connection = (ProtonJConnection) Connection.Factory.create();
+
+ defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
+ defers.setEventHandler(new Task(){
+ public void run() {
+ for( Defer defer: defers.getData() ) {
+ assert defer.defered = true;
+ defer.defered = false;
+ defer.run();
+ }
+ }
+ });
+ defers.resume();
+ }
+
+ static public AmqpTransport connect(AmqpConnectOptions options) {
+ AmqpConnectOptions opts = options.clone();
+ if( opts.getDispatchQueue() == null ) {
+ opts.setDispatchQueue(Dispatch.createQueue());
+ }
+ if( opts.getBlockingExecutor() == null ) {
+ opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
+ }
+ return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
+ }
+
+ private AmqpTransport connecting(final AmqpConnectOptions options) {
+ assert state == CREATED;
+ try {
+ state = CONNECTING;
+ if( options.getLocalContainerId()!=null ) {
+ connection.setLocalContainerId(options.getLocalContainerId());
+ }
+ if( options.getRemoteContainerId()!=null ) {
+ connection.setContainer(options.getRemoteContainerId());
+ }
+ connection.setHostname(options.getHost().getHost());
+ Callback<Void> onConnect = new Callback<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ if( state == CONNECTED ) {
+ hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+ fireWatches();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ if( state == CONNECTED || state == CONNECTING ) {
+ failure = value;
+ disconnect();
+ fireWatches();
+ }
+ }
+ };
+ if( options.getUser()!=null ) {
+ onConnect = new SaslClientHandler(options, onConnect);
+ }
+ createTransport(options, onConnect);
+ } catch (Throwable e) {
+ failure = e;
+ }
+ fireWatches();
+ return this;
+ }
+
+ public TransportState getState() {
+ return state;
+ }
+
+ /**
+ * Creates and start a transport to the AMQP server. Passes it to the onConnect
+ * once the transport is connected.
+ *
+ * @param onConnect
+ * @throws Exception
+ */
+ void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
+ final TcpTransport transport;
+ if( options.getSslContext() !=null ) {
+ SslTransport ssl = new SslTransport();
+ ssl.setSSLContext(options.getSslContext());
+ transport = ssl;
+ } else {
+ transport = new TcpTransport();
+ }
+
+ URI host = options.getHost();
+ if( host.getPort() == -1 ) {
+ if( options.getSslContext()!=null ) {
+ host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
+ } else {
+ host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
+ }
+ }
+
+
+ transport.setBlockingExecutor(options.getBlockingExecutor());
+ transport.setDispatchQueue(options.getDispatchQueue());
+
+ transport.setMaxReadRate(options.getMaxReadRate());
+ transport.setMaxWriteRate(options.getMaxWriteRate());
+ transport.setReceiveBufferSize(options.getReceiveBufferSize());
+ transport.setSendBufferSize(options.getSendBufferSize());
+ transport.setTrafficClass(options.getTrafficClass());
+ transport.setUseLocalHost(options.isUseLocalHost());
+ transport.connecting(host, options.getLocalAddress());
+
+ transport.setTransportListener(new DefaultTransportListener(){
+ public void onTransportConnected() {
+ if(state==CONNECTING) {
+ state = CONNECTED;
+ onConnect.onSuccess(null);
+ transport.resumeRead();
+ }
+ }
+
+ public void onTransportFailure(final IOException error) {
+ if(state==CONNECTING) {
+ onConnect.onFailure(error);
+ }
+ }
+
+ });
+ transport.connecting(host, options.getLocalAddress());
+ bind(transport);
+ transport.start(NOOP);
+ }
+
+ class SaslClientHandler extends ChainedCallback<Void, Void> {
+
+ private final AmqpConnectOptions options;
+
+ public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
+ super(next);
+ this.options = options;
+ }
+
+ public void onSuccess(final Void value) {
+ final Sasl s = protonTransport.sasl();
+ s.client();
+ pumpOut();
+ hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
+
+ Sasl sasl = s;
+
+ @Override
+ void process() {
+ if (sasl != null) {
+ sasl = processSaslEvent(sasl);
+ if (sasl == null) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ }
+ }
+ }
+
+ @Override
+ public void onTransportFailure(IOException error) {
+ next.onFailure(error);
+ }
+
+ @Override
+ void onFailure(Throwable error) {
+ next.onFailure(error);
+ }
+
+ boolean authSent = false;
+
+ private Sasl processSaslEvent(Sasl sasl) {
+ if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
+ next.onSuccess(null);
+ return null;
+ }
+ HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
+ if (!authSent && !mechanisims.isEmpty()) {
+ if (mechanisims.contains("PLAIN")) {
+ authSent = true;
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+ try {
+ os.writeByte(0);
+ os.write(new UTF8Buffer(options.getUser()));
+ os.writeByte(0);
+ if (options.getPassword() != null) {
+ os.write(new UTF8Buffer(options.getPassword()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Buffer buffer = os.toBuffer();
+ sasl.setMechanisms(new String[]{"PLAIN"});
+ sasl.send(buffer.data, buffer.offset, buffer.length);
+ } else if (mechanisims.contains("ANONYMOUS")) {
+ authSent = true;
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ sasl.send(new byte[0], 0, 0);
+ } else {
+ next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
+ return null;
+ }
+ }
+ return sasl;
+ }
+ });
+ }
+ }
+
+ class SaslServerListener extends AmqpTransportListener {
+ Sasl sasl;
+
+ @Override
+ public void onTransportCommand(Object command) {
+ try {
+ if (command.getClass() == AmqpHeader.class) {
+ AmqpHeader header = (AmqpHeader)command;
+ switch( header.getProtocolId() ) {
+ case 3: // Client will be using SASL for auth..
+ if( listener!=null ) {
+ sasl = listener.processSaslConnect(protonTransport);
+ break;
+ }
+ default:
+ AmqpTransportListener listener = new AmqpTransportListener();
+ hawtdispatchTransport.setTransportListener(listener);
+ listener.onTransportCommand(command);
+ return;
+ }
+ command = header.getBuffer();
+ }
+ } catch (Exception e) {
+ onFailure(e);
+ }
+ super.onTransportCommand(command);
+ }
+
+ @Override
+ void process() {
+ if (sasl != null) {
+ sasl = listener.processSaslEvent(sasl);
+ }
+ if (sasl == null) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+ }
+ }
+ }
+
+ static public AmqpTransport accept(Transport transport) {
+ return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
+ }
+
+ private AmqpTransport accepted(final Transport transport) {
+ state = CONNECTED;
+ bind(transport);
+ hawtdispatchTransport.setTransportListener(new SaslServerListener());
+ return this;
+ }
+
+ private void bind(final Transport transport) {
+ this.hawtdispatchTransport = transport;
+ this.protonTransport = (ProtonJTransport) org.apache.qpid.proton.engine.Transport.Factory.create();
+ this.protonTransport.bind(connection);
+ if( transport.getProtocolCodec()==null ) {
+ try {
+ transport.setProtocolCodec(new AmqpProtocolCodec());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void defer(Defer defer) {
+ if( !defer.defered ) {
+ defer.defered = true;
+ defers.merge(defer);
+ }
+ }
+
+ public void pumpOut() {
+ assertExecuting();
+ defer(deferedPumpOut);
+ }
+
+ private Defer deferedPumpOut = new Defer() {
+ public void run() {
+ doPumpOut();
+ }
+ };
+
+ private void doPumpOut() {
+ switch(state) {
+ case CONNECTING:
+ case CONNECTED:
+ break;
+ default:
+ return;
+ }
+
+ int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+ byte data[] = new byte[size];
+ boolean done = false;
+ int pumped = 0;
+ while( !done && !hawtdispatchTransport.full() ) {
+ int count = protonTransport.output(data, 0, size);
+ if( count > 0 ) {
+ pumped += count;
+ boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
+ assert accepted: "Should be accepted since the transport was not full";
+ } else {
+ done = true;
+ }
+ }
+ if( pumped > 0 && !hawtdispatchTransport.full() ) {
+ listener.processRefill();
+ }
+ }
+
+ public Sasl sasl;
+ public void fireListenerEvents() {
+ fireWatches();
+
+ if( sasl!=null ) {
+ sasl = listener.processSaslEvent(sasl);
+ if( sasl==null ) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ }
+ }
+
+ context(connection).fireListenerEvents(listener);
+
+ Session session = connection.sessionHead(ALL_SET, ALL_SET);
+ while(session != null)
+ {
+ context(session).fireListenerEvents(listener);
+ session = session.next(ALL_SET, ALL_SET);
+ }
+
+ Link link = connection.linkHead(ALL_SET, ALL_SET);
+ while(link != null)
+ {
+ context(link).fireListenerEvents(listener);
+ link = link.next(ALL_SET, ALL_SET);
+ }
+
+ Delivery delivery = connection.getWorkHead();
+ while(delivery != null)
+ {
+ listener.processDelivery(delivery);
+ delivery = delivery.getWorkNext();
+ }
+
+ listener.processRefill();
+ }
+
+
+ public ProtonJConnection connection() {
+ return connection;
+ }
+
+ AmqpListener listener = new AmqpListener();
+ public AmqpListener getListener() {
+ return listener;
+ }
+
+ public void setListener(AmqpListener listener) {
+ this.listener = listener;
+ }
+
+ public EndpointContext context(Endpoint endpoint) {
+ EndpointContext context = (EndpointContext) endpoint.getContext();
+ if( context == null ) {
+ context = new EndpointContext(this, endpoint);
+ endpoint.setContext(context);
+ }
+ return context;
+ }
+
+ class AmqpTransportListener extends DefaultTransportListener {
+
+ @Override
+ public void onTransportConnected() {
+ if( listener!=null ) {
+ listener.processTransportConnected();
+ }
+ }
+
+ @Override
+ public void onRefill() {
+ if( listener!=null ) {
+ listener.processRefill();
+ }
+ }
+
+ @Override
+ public void onTransportCommand(Object command) {
+ if( state != CONNECTED ) {
+ return;
+ }
+ try {
+ Buffer buffer;
+ if (command.getClass() == AmqpHeader.class) {
+ buffer = ((AmqpHeader) command).getBuffer();
+ } else {
+ buffer = (Buffer) command;
+ }
+ ByteBuffer bbuffer = buffer.toByteBuffer();
+ do {
+ ByteBuffer input = protonTransport.getInputBuffer();
+ ByteBufferUtils.pour(bbuffer, input);
+ protonTransport.processInput();
+ } while (bbuffer.remaining() > 0);
+ process();
+ pumpOut();
+ } catch (Exception e) {
+ onFailure(e);
+ }
+ }
+
+ void process() {
+ fireListenerEvents();
+ }
+
+ @Override
+ public void onTransportFailure(IOException error) {
+ if( state==CONNECTED ) {
+ failure = error;
+ if( listener!=null ) {
+ listener.processTransportFailure(error);
+ fireWatches();
+ }
+ }
+ }
+
+ void onFailure(Throwable error) {
+ failure = error;
+ if( listener!=null ) {
+ listener.processFailure(error);
+ fireWatches();
+ }
+ }
+ }
+
+ public void disconnect() {
+ assertExecuting();
+ if( state == CONNECTING || state==CONNECTED) {
+ state = DISCONNECTING;
+ if( hawtdispatchTransport!=null ) {
+ hawtdispatchTransport.stop(new Task(){
+ public void run() {
+ state = DISCONNECTED;
+ hawtdispatchTransport = null;
+ protonTransport = null;
+ fireWatches();
+ }
+ });
+ }
+ }
+ }
+
+ public DispatchQueue queue() {
+ return queue;
+ }
+
+ public void assertExecuting() {
+ queue().assertExecuting();
+ }
+
+ public void onTransportConnected(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( failure !=null ) {
+ cb.onFailure(failure);
+ return true;
+ }
+ if( state!=CONNECTING ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void onTransportDisconnected(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( state==DISCONNECTED ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void onTransportFailure(final Callback<Throwable> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( failure!=null ) {
+ cb.onSuccess(failure);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public Throwable getFailure() {
+ return failure;
+ }
+
+ public void setProtocolTracer(ProtocolTracer protocolTracer) {
+ protonTransport.setProtocolTracer(protocolTracer);
+ }
+
+ public ProtocolTracer getProtocolTracer() {
+ return protonTransport.getProtocolTracer();
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Defer extends Task {
+ boolean defered;
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class EndpointContext {
+
+ private final AmqpTransport transport;
+ private final Endpoint endpoint;
+ private Object attachment;
+ boolean listenerProcessing;
+
+ public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
+ this.transport = transport;
+ this.endpoint = endpoint;
+ }
+
+ class ProcessedTask extends Task {
+ @Override
+ public void run() {
+ transport.assertExecuting();
+ listenerProcessing = false;
+ transport.pumpOut();
+ }
+ }
+
+ public void fireListenerEvents(AmqpListener listener) {
+ if( listener!=null && !listenerProcessing ) {
+ if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
+ endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
+ listenerProcessing = true;
+ listener.processRemoteOpen(endpoint, new ProcessedTask());
+ } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
+ endpoint.getRemoteState() == EndpointState.CLOSED ) {
+ listenerProcessing = true;
+ listener.processRemoteClose(endpoint, new ProcessedTask());
+ }
+ }
+ if( attachment !=null && attachment instanceof Task ) {
+ ((Task) attachment).run();
+ }
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ public <T> T getAttachment(Class<T> clazz) {
+ return clazz.cast(getAttachment());
+ }
+
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Support {
+
+ public static IllegalStateException illegalState(String msg) {
+ return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
+ }
+
+ public static IllegalStateException createUnhandledEventError() {
+ return illegalState("Unhandled event.");
+ }
+
+ public static IllegalStateException createListenerNotSetError() {
+ return illegalState("No connection listener set to handle message received from the server.");
+ }
+
+ public static IllegalStateException createDisconnectedError() {
+ return illegalState("Disconnected");
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class Watch {
+ /* returns true if the watch has been triggered */
+ public abstract boolean execute();
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class WatchBase {
+
+ private LinkedList<Watch> watches = new LinkedList<Watch>();
+ protected void addWatch(final Watch task) {
+ watches.add(task);
+ fireWatches();
+ }
+
+ protected void fireWatches() {
+ if( !this.watches.isEmpty() ) {
+ Dispatch.getCurrentQueue().execute(new Task(){
+ @Override
+ public void run() {
+ // Lets see if any of the watches are triggered.
+ LinkedList<Watch> tmp = watches;
+ watches = new LinkedList<Watch>();
+ for (Watch task : tmp) {
+ if( !task.execute() ) {
+ watches.add(task);
+ }
+ }
+ }
+ });
+ }
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,292 @@
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.test.MessengerServer;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtdispatch.Task;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+/**
+ * Hello world!
+ *
+ */
+
+public class SampleTest {
+
+ private static final Logger _logger = Logger.getLogger(SampleTest.class.getName());
+
+ private MessengerServer server;
+
+ @Before
+ public void startServer() {
+ server = new MessengerServer();
+ server.start();
+ }
+
+ @After
+ public void stopServer() {
+ server.stop();
+ }
+
+ @Test
+ public void test() throws Exception {
+ int expected = 10;
+ final AtomicInteger countdown = new AtomicInteger(expected);
+ AmqpConnectOptions options = new AmqpConnectOptions();
+ final String container = UUID.randomUUID().toString();
+ try {
+ options.setHost(server.getHost(), server.getPort());
+ options.setLocalContainerId(container);
+ options.setUser("anonymous");
+ options.setPassword("changeit");
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ final AmqpConnection conn = AmqpConnection.connect(options );
+ _logger.fine("connection queue");
+ conn.queue().execute(new Task() {
+
+ @Override
+ public void run() {
+ _logger.fine("connection running, setup callbacks");
+ conn.onTransportFailure(new Callback<Throwable>() {
+
+ @Override
+ public void onSuccess(Throwable value) {
+ _logger.fine("transportFailure Success? " + str(value));
+ conn.close();
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("transportFailure Trouble! " + str(value));
+ conn.close();
+ }
+ });
+
+ conn.onConnected(new Callback<Void>() {
+
+ @Override
+ public void onSuccess(Void value) {
+ _logger.fine("on connect Success! in container " + container);
+ final AmqpSession session = conn.createSession();
+ Target rqtarget = new Target();
+ rqtarget.setAddress("rq-tgt");
+ final AmqpSender sender = session.createSender(rqtarget, QoS.AT_LEAST_ONCE, "request-yyy");
+ Source rqsource = new Source();
+ rqsource.setAddress("rs-src");
+ sender.getEndpoint().setSource(rqsource);
+ Source rssource = new Source();
+ rssource.setAddress("rs-src");
+ final AmqpReceiver receiver = session.createReceiver(rssource , QoS.AT_LEAST_ONCE, 10, "response-yyy");
+ Target rstarget = new Target();
+ final String address = "rs-tgt";
+ rstarget.setAddress(address);
+ receiver.getEndpoint().setTarget(rstarget);
+ sender.onRemoteClose(new Callback<ErrorCondition>() {
+
+ @Override
+ public void onSuccess(ErrorCondition value) {
+ _logger.fine("sender remote close!" + str(value));
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("sender remote close Trouble!" + str(value));
+ conn.close();
+
+ }
+
+ });
+ receiver.onRemoteClose(new Callback<ErrorCondition>() {
+
+ @Override
+ public void onSuccess(ErrorCondition value) {
+ _logger.fine("receiver remote close!" + str(value));
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("receiver remote close Trouble!" + str(value));
+ conn.close();
+
+ }
+
+ });
+
+ final Task work = new Task() {
+
+ private AtomicInteger count = new AtomicInteger();
+
+ @Override
+ public void run() {
+ Message message = session.createTextMessage("hello world! " + String.valueOf(count.incrementAndGet()));
+ message.setAddress("amqp://joze/rq-src");
+ String reply_to = "amqp://" + container + "/" + address;
+ message.setReplyTo(reply_to);
+ message.setCorrelationId("correlator");
+ final MessageDelivery md = sender.send(message);
+ md.onRemoteStateChange(new Callback<DeliveryState>() {
+
+ @Override
+ public void onSuccess(DeliveryState value) {
+ _logger.fine("delivery remote state change! " + str(value) +
+ " local: "+ str(md.getLocalState()) +
+ " remote: " + str(md.getRemoteState()));
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("remote state change Trouble!" + str(value));
+ conn.close();
+ }
+
+ });
+ md.onSettle(new Callback<DeliveryState>() {
+
+ @Override
+ public void onSuccess(DeliveryState value) {
+ _logger.fine("delivery settled! " + str(value) +
+ " local: "+ str(md.getLocalState()) +
+ " remote: " + str(md.getRemoteState()));
+ _logger.fine("sender settle mode state " +
+ " local receiver " + str(sender.getEndpoint().getReceiverSettleMode()) +
+ " local sender " + str(sender.getEndpoint().getSenderSettleMode()) +
+ " remote receiver " + str(sender.getEndpoint().getRemoteReceiverSettleMode()) +
+ " remote sender " + str(sender.getEndpoint().getRemoteSenderSettleMode()) +
+ ""
+ );
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("delivery sending Trouble!" + str(value));
+ conn.close();
+ }
+ });
+ }
+
+ };
+ receiver.setDeliveryListener(new AmqpDeliveryListener() {
+
+ @Override
+ public void onMessageDelivery(
+ MessageDelivery delivery) {
+ Message message = delivery.getMessage();
+ _logger.fine("incoming message delivery! " +
+ " local " + str(delivery.getLocalState()) +
+ " remote " + str(delivery.getRemoteState()) +
+ " message " + str(message.getBody()) +
+ "");
+ delivery.onSettle(new Callback<DeliveryState>() {
+
+ @Override
+ public void onSuccess(DeliveryState value) {
+ _logger.fine("incoming message settled! ");
+ int i = countdown.decrementAndGet();
+ if ( i > 0 ) {
+ _logger.fine("More work " + str(i));
+ work.run();
+ } else {
+ conn.queue().executeAfter(100, TimeUnit.MILLISECONDS, new Task() {
+
+ @Override
+ public void run() {
+ _logger.fine("stopping sender");
+ sender.close();
+ }
+ });
+ conn.queue().executeAfter(200, TimeUnit.MILLISECONDS, new Task() {
+
+ @Override
+ public void run() {
+ _logger.fine("stopping receiver");
+ receiver.close();
+
+ }
+ });
+ conn.queue().executeAfter(300, TimeUnit.MILLISECONDS, new Task() {
+
+ @Override
+ public void run() {
+ _logger.fine("stopping session");
+ session.close();
+
+ }
+ });
+ conn.queue().executeAfter(400, TimeUnit.MILLISECONDS, new Task() {
+
+ @Override
+ public void run() {
+ _logger.fine("stopping connection");
+ conn.close();
+
+ }
+ });
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("trouble settling incoming message " + str(value));
+ conn.close();
+ }
+ });
+ delivery.settle();
+ }
+
+ });
+
+ // start the receiver
+ receiver.resume();
+
+ // send first message
+ conn.queue().execute(work);
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ _logger.fine("on connect Failure?" + str(value));
+ conn.close();
+ }
+ });
+ _logger.fine("connection setup done");
+
+
+ }
+
+ });
+ try {
+ _logger.fine("Waiting...");
+ Future<Void> disconnectedFuture = conn.getDisconnectedFuture();
+ disconnectedFuture.await(10, TimeUnit.SECONDS);
+ _logger.fine("done");
+ assertEquals(expected, server.getMessagesReceived());
+ } catch (Exception e) {
+ _logger.log(Level.SEVERE, "Test failed, possibly due to timeout", e);
+ throw e;
+ }
+ }
+
+ private String str(Object value) {
+ if (value == null)
+ return "null";
+ return value.toString();
+ }
+
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,135 @@
+package org.apache.qpid.proton.hawtdispatch.test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.Tracker;
+
+public class MessengerServer {
+ public static final String REJECT_ME = "*REJECT-ME*";
+ private int timeout = 1000;
+ private String host = "127.0.0.1";
+ private int port = 55555;
+ private Messenger msgr;
+ private AtomicInteger messagesReceived = new AtomicInteger(0);
+ private AtomicInteger messagesSent = new AtomicInteger(0);
+ private AtomicBoolean serverShouldRun = new AtomicBoolean();
+ private AtomicReference<Throwable> issues = new AtomicReference<Throwable>();
+ private Thread thread;
+ private CountDownLatch serverStart;
+
+ public MessengerServer() {
+ }
+ public void start() {
+ if (!serverShouldRun.compareAndSet(false, true)) {
+ throw new IllegalStateException("started twice");
+ }
+ msgr = Proton.messenger();
+ serverStart = new CountDownLatch(1);
+ thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ msgr.start();
+ msgr.subscribe("amqp://~"+host+":"+String.valueOf(port));
+ serverStart.countDown();
+ try {
+ while(serverShouldRun.get()) {
+ msgr.recv(100);
+ while (msgr.incoming() > 0) {
+ Message msg = msgr.get();
+ messagesReceived.incrementAndGet();
+ Tracker tracker = msgr.incomingTracker();
+ if (REJECT_ME.equals(msg.getBody())) {
+ msgr.reject(tracker , 0);
+ } else {
+ msgr.accept(tracker, 0);
+ }
+ String reply_to = msg.getReplyTo();
+ if (reply_to != null) {
+ msg.setAddress(reply_to);
+ msgr.put(msg);
+ msgr.settle(msgr.outgoingTracker(), 0);
+ }
+ }
+ }
+ } finally {
+ msgr.stop();
+ }
+ } catch (InterruptException ex) {
+ // we're done
+ } catch (Exception ex) {
+ issues.set(ex);
+ }
+ }
+
+ });
+ thread.setName("MessengerServer");
+ thread.setDaemon(true);
+ thread.start();
+ try {
+ serverStart.await();
+ } catch (InterruptedException e) {
+ msgr.interrupt();
+ }
+ }
+
+ public void stop() {
+ if (!serverShouldRun.compareAndSet(true, false)) {
+ return;
+ }
+ if (serverStart.getCount() == 0)
+ msgr.interrupt();
+ try {
+ thread.join(timeout);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ thread = null;
+ if (!msgr.stopped())
+ msgr.stop();
+ Throwable throwable = issues.get();
+ if (throwable != null)
+ throw new RuntimeException("Messenger server had problems", throwable);
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+ public int getTimeout() {
+ return timeout;
+ }
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMessagesReceived() {
+ return messagesReceived.get();
+ }
+
+ public int getMessagesSent() {
+ return messagesSent.get();
+ }
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org