You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2012/11/28 22:48:40 UTC
svn commit: r1414946 [1/2] - in /qpid/proton/trunk/proton-j: ./
contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/
contrib/proton-hawtdispatch/src/main/
contrib/proton-hawtdispatch/src/main/java/
contrib/proton-hawtdispatch/src/main/java/org...
Author: tross
Date: Wed Nov 28 21:48:33 2012
New Revision: 1414946
URL: http://svn.apache.org/viewvc?rev=1414946&view=rev
Log:
PROTON-22 - HawtDispatch Module
Contributed by Hiram Chirino
Added:
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml (with props)
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
Modified:
qpid/proton/trunk/proton-j/pom.xml
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml?rev=1414946&view=auto
==============================================================================
Binary file - no diff available.
Propchange: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml
------------------------------------------------------------------------------
svn:mime-type = application/xml
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnectOptions implements Cloneable {
+
+ private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
+ private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
+ private static ThreadPoolExecutor blockingThreadPool;
+
+ public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+ if( blockingThreadPool == null ) {
+ blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
+ rc.setDaemon(true);
+ return rc;
+ }
+ }) {
+
+ @Override
+ public void shutdown() {
+ // we don't ever shutdown since we are shared..
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ // we don't ever shutdown since we are shared..
+ return Collections.emptyList();
+ }
+ };
+ }
+ return blockingThreadPool;
+ }
+ public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+ blockingThreadPool = pool;
+ }
+
+ private static final URI DEFAULT_HOST;
+ static {
+ try {
+ DEFAULT_HOST = new URI("tcp://localhost");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ URI host = DEFAULT_HOST;
+ URI localAddress;
+ SSLContext sslContext;
+ DispatchQueue dispatchQueue;
+ Executor blockingExecutor;
+ int maxReadRate;
+ int maxWriteRate;
+ int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
+ boolean useLocalHost;
+ int receiveBufferSize = 1024*64;
+ int sendBufferSize = 1024*64;
+ String localContainerId;
+ String remoteContainerId;
+ String user;
+ String password;
+
+
+ @Override
+ public AmqpConnectOptions clone() {
+ try {
+ return (AmqpConnectOptions) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getLocalContainerId() {
+ return localContainerId;
+ }
+
+ public void setLocalContainerId(String localContainerId) {
+ this.localContainerId = localContainerId;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getRemoteContainerId() {
+ return remoteContainerId;
+ }
+
+ public void setRemoteContainerId(String remoteContainerId) {
+ this.remoteContainerId = remoteContainerId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public Executor getBlockingExecutor() {
+ return blockingExecutor;
+ }
+
+ public void setBlockingExecutor(Executor blockingExecutor) {
+ this.blockingExecutor = blockingExecutor;
+ }
+
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
+ }
+
+ public void setDispatchQueue(DispatchQueue dispatchQueue) {
+ this.dispatchQueue = dispatchQueue;
+ }
+
+ public URI getLocalAddress() {
+ return localAddress;
+ }
+
+ public void setLocalAddress(String localAddress) throws URISyntaxException {
+ this.setLocalAddress(new URI(localAddress));
+ }
+ public void setLocalAddress(URI localAddress) {
+ this.localAddress = localAddress;
+ }
+
+ public int getMaxReadRate() {
+ return maxReadRate;
+ }
+
+ public void setMaxReadRate(int maxReadRate) {
+ this.maxReadRate = maxReadRate;
+ }
+
+ public int getMaxWriteRate() {
+ return maxWriteRate;
+ }
+
+ public void setMaxWriteRate(int maxWriteRate) {
+ this.maxWriteRate = maxWriteRate;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public URI getHost() {
+ return host;
+ }
+ public void setHost(String host, int port) throws URISyntaxException {
+ this.setHost(new URI("tcp://"+host+":"+port));
+ }
+ public void setHost(String host) throws URISyntaxException {
+ this.setHost(new URI(host));
+ }
+ public void setHost(URI host) {
+ this.host = host;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ public void setSslContext(SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ public int getTrafficClass() {
+ return trafficClass;
+ }
+
+ public void setTrafficClass(int trafficClass) {
+ this.trafficClass = trafficClass;
+ }
+
+ public boolean isUseLocalHost() {
+ return useLocalHost;
+ }
+
+ public void setUseLocalHost(boolean useLocalHost) {
+ this.useLocalHost = useLocalHost;
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection extends AmqpEndpointBase {
+
+ AmqpTransport transport;
+ ConnectionImpl connection;
+ HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
+ boolean closing = false;
+
+ public static AmqpConnection connect(AmqpConnectOptions options) {
+ return new AmqpConnection(options);
+ }
+
+ private AmqpConnection(AmqpConnectOptions options) {
+ transport = AmqpTransport.connect(options);
+ transport.setListener(new AmqpListener() {
+ @Override
+ public void processDelivery(Delivery delivery) {
+ Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
+ AmqpLink link = (AmqpLink) attachment.endpoint();
+ link.processDelivery(delivery);
+ }
+
+ @Override
+ public void processRefill() {
+ for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
+ sender.pumpDeliveries();
+ }
+ pumpOut();
+ }
+
+ });
+ connection = transport.connection();
+ connection.open();
+ attach();
+ }
+
+ public void waitForConnected() throws Exception {
+ assertNotOnDispatchQueue();
+ getConnectedFuture().await();
+ }
+
+ public Future<Void> getConnectedFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onConnected(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onConnected(Callback<Void> cb) {
+ transport.onTransportConnected(cb);
+ }
+
+ @Override
+ protected Endpoint getEndpoint() {
+ return connection;
+ }
+
+ @Override
+ protected AmqpConnection getConnection() {
+ return this;
+ }
+
+ @Override
+ protected AmqpEndpointBase getParent() {
+ return null;
+ }
+
+ public AmqpSession createSession() {
+ assertExecuting();
+ SessionImpl session = connection.session();
+ session.open();
+ pumpOut();
+ return new AmqpSession(this, session);
+ }
+
+ public int getMaxSessions() {
+ return connection.getMaxChannels();
+ }
+
+ public void disconnect() {
+ closing = true;
+ transport.disconnect();
+ }
+
+ public void waitForDisconnected() throws Exception {
+ assertNotOnDispatchQueue();
+ getDisconnectedFuture().await();
+ }
+
+ public Future<Void> getDisconnectedFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onDisconnected(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onDisconnected(Callback<Void> cb) {
+ transport.onTransportDisconnected(cb);
+ }
+
+ public TransportState getTransportState() {
+ return transport.getState();
+ }
+
+ public Throwable getTransportFailure() {
+ return transport.getFailure();
+ }
+
+ public Future<Throwable> getTransportFailureFuture() {
+ final Promise<Throwable> rc = new Promise<Throwable>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onTransportFailure(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onTransportFailure(Callback<Throwable> cb) {
+ transport.onTransportFailure(cb);
+ }
+
+ @Override
+ public DispatchQueue queue() {
+ return super.queue();
+ }
+
+ public void setProtocolTracer(ProtocolTracer protocolTracer) {
+ transport.setProtocolTracer(protocolTracer);
+ }
+
+ public ProtocolTracer getProtocolTracer() {
+ return transport.getProtocolTracer();
+ }
+
+ /**
+ * Once the remote end, closes the transport is disconnected.
+ */
+ @Override
+ public void close() {
+ super.close();
+ onRemoteClose(new Callback<EndpointError>() {
+ @Override
+ public void onSuccess(EndpointError value) {
+ disconnect();
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ disconnect();
+ }
+ });
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AmqpDeliveryListener {
+
+ /**
+ * Caller should suspend/resume the AmqpReceiver to
+ * flow control the delivery of messages.
+ *
+ * @param delivery
+ */
+ void onMessageDelivery(MessageDelivery delivery);
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.*;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class AmqpEndpointBase extends WatchBase {
+ abstract protected Endpoint getEndpoint();
+ abstract protected AmqpEndpointBase getParent();
+
+ protected AmqpConnection getConnection() {
+ return getParent().getConnection();
+ }
+
+ protected AmqpTransport getTransport() {
+ return getConnection().transport;
+ }
+
+ protected DispatchQueue queue() {
+ return getTransport().queue();
+ }
+
+ protected void assertExecuting() {
+ getTransport().assertExecuting();
+ }
+
+ public void waitForRemoteOpen() throws Exception {
+ assertNotOnDispatchQueue();
+ getRemoteOpenFuture().await();
+ }
+
+ public Future<Void> getRemoteOpenFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteOpen(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onRemoteOpen(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ switch (getEndpoint().getRemoteState()) {
+ case ACTIVE:
+ cb.onSuccess(null);
+ return true;
+ case CLOSED:
+ cb.onFailure(Support.illegalState("closed"));
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public EndpointError waitForRemoteClose() throws Exception {
+ assertNotOnDispatchQueue();
+ return getRemoteCloseFuture().await();
+ }
+
+ public Future<EndpointError> getRemoteCloseFuture() {
+ final Promise<EndpointError> rc = new Promise<EndpointError>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteClose(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onRemoteClose(final Callback<EndpointError> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+ cb.onSuccess(getEndpoint().getRemoteError());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void close() {
+ getEndpoint().close();
+ pumpOut();
+ }
+
+ public EndpointState getRemoteState() {
+ return getEndpoint().getRemoteState();
+ }
+
+ public EndpointError getRemoteError() {
+ return getEndpoint().getRemoteError();
+ }
+
+ static protected EndpointError toError(Throwable value) {
+ return new EndpointError("error", value.toString());
+ }
+
+ class Attachment extends Task {
+ AmqpEndpointBase endpoint() {
+ return AmqpEndpointBase.this;
+ }
+
+ @Override
+ public void run() {
+ fireWatches();
+ }
+ }
+
+ protected void attach() {
+ getTransport().context(getEndpoint()).setAttachment(new Attachment());
+ }
+
+ protected void defer(Defer defer) {
+ getTransport().defer(defer);
+ }
+
+ protected void pumpOut() {
+ getTransport().pumpOut();
+ }
+
+ static protected void assertNotOnDispatchQueue() {
+ assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
+ }
+
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AmqpLink extends AmqpEndpointBase {
+ abstract protected void processDelivery(Delivery delivery);
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpReceiver extends AmqpLink {
+
+ final AmqpSession parent;
+ final ReceiverImpl receiver;
+
+ public AmqpReceiver(AmqpSession parent, ReceiverImpl receiver, QoS qos) {
+ this.parent = parent;
+ this.receiver = receiver;
+ attach();
+ }
+
+ @Override
+ protected ReceiverImpl getEndpoint() {
+ return receiver;
+ }
+ @Override
+ protected AmqpSession getParent() {
+ return parent;
+ }
+
+ ByteArrayOutputStream current = new ByteArrayOutputStream();
+
+ @Override
+ protected void processDelivery(Delivery delivery) {
+ if( !delivery.isReadable() ) {
+ System.out.println("it was not readable!");
+ return;
+ }
+
+ if( current==null ) {
+ current = new ByteArrayOutputStream();
+ }
+
+ int count;
+ byte data[] = new byte[1024*4];
+ while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+ current.write(data, 0, count);
+ }
+
+ // Expecting more deliveries..
+ if( count == 0 ) {
+ return;
+ }
+
+ receiver.advance();
+ Buffer buffer = current.toBuffer();
+ current = null;
+ onMessage(delivery, buffer);
+
+ }
+
+ LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
+
+ protected void onMessage(Delivery delivery, Buffer buffer) {
+ MessageDelivery md = new MessageDelivery(buffer) {
+ @Override
+ AmqpLink link() {
+ return AmqpReceiver.this;
+ }
+
+ @Override
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.disposition(new Accepted());
+ delivery.settle();
+ }
+ drain();
+ }
+ };
+ md.delivery = (DeliveryImpl) delivery;
+ delivery.setContext(md);
+ inbound.add(md);
+ drainInbound();
+ }
+
+ public void drain() {
+ defer(deferedDrain);
+ }
+
+ Defer deferedDrain = new Defer(){
+ public void run() {
+ drainInbound();
+ }
+ };
+ int resumed = 0;
+
+ public void resume() {
+ resumed++;
+ }
+ public void suspend() {
+ resumed--;
+ }
+
+ AmqpDeliveryListener deliveryListener;
+ private void drainInbound() {
+ while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
+ deliveryListener.onMessageDelivery(inbound.removeFirst());
+ receiver.flow(1);
+ }
+ }
+
+ public AmqpDeliveryListener getDeliveryListener() {
+ return deliveryListener;
+ }
+
+ public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
+ this.deliveryListener = deliveryListener;
+ drainInbound();
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSender extends AmqpLink {
+
+ private byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+ long nextTagId = 0;
+ HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+ final AmqpSession parent;
+ private final QoS qos;
+ final SenderImpl sender;
+
+ public AmqpSender(AmqpSession parent, SenderImpl sender, QoS qos) {
+ this.parent = parent;
+ this.sender = sender;
+ this.qos = qos;
+ attach();
+ getConnection().senders.add(this);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ getConnection().senders.remove(this);
+ }
+
+ @Override
+ protected SenderImpl getEndpoint() {
+ return sender;
+ }
+
+ @Override
+ protected AmqpSession getParent() {
+ return parent;
+ }
+
+ final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
+ long outboundBufferSize;
+
+ public MessageDelivery send(Message message) {
+ assertExecuting();
+ MessageDelivery rc = new MessageDelivery(message) {
+ @Override
+ AmqpLink link() {
+ return AmqpSender.this;
+ }
+
+ @Override
+ public void redeliver(boolean incrementDeliveryCounter) {
+ super.redeliver(incrementDeliveryCounter);
+ outbound.add(this);
+ outboundBufferSize += initialSize;
+ defer(deferedPumpDeliveries);
+ }
+ };
+ outbound.add(rc);
+ outboundBufferSize += rc.initialSize;
+ pumpDeliveries();
+ pumpOut();
+ return rc;
+ }
+
+ Buffer currentBuffer;
+ DeliveryImpl currentDelivery;
+
+ Defer deferedPumpDeliveries = new Defer() {
+ public void run() {
+ pumpDeliveries();
+ }
+ };
+
+ public long getOverflowBufferSize() {
+ return outboundBufferSize;
+ }
+
+ protected void pumpDeliveries() {
+ assertExecuting();
+ try {
+ while(true) {
+ while( currentBuffer !=null ) {
+ if( sender.getCredit() > 0 ) {
+ int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+ currentBuffer.moveHead(sent);
+ if( currentBuffer.length == 0 ) {
+ DeliveryImpl current = currentDelivery;
+ MessageDelivery md = (MessageDelivery) current.getContext();
+ currentBuffer = null;
+ currentDelivery = null;
+ if( qos == QoS.AT_MOST_ONCE ) {
+ current.settle();
+ } else {
+ sender.advance();
+ }
+ md.fireWatches();
+ }
+ } else {
+ return;
+ }
+ }
+
+ if( outbound.isEmpty() ) {
+ return;
+ }
+
+ final MessageDelivery md = outbound.removeFirst();
+ outboundBufferSize -= md.initialSize;
+ currentBuffer = md.encoded();
+ if( qos == QoS.AT_MOST_ONCE ) {
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ } else {
+ final byte[] tag = nextTag();
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ }
+ md.delivery = currentDelivery;
+ currentDelivery.setContext(md);
+ }
+ } finally {
+ fireWatches();
+ }
+ }
+
+ @Override
+ protected void processDelivery(Delivery delivery) {
+ final MessageDelivery md = (MessageDelivery) delivery.getContext();
+ if( delivery.remotelySettled() ) {
+ if( delivery.getTag().length > 0 ) {
+ checkinTag(delivery.getTag());
+ }
+
+ final DeliveryState state = delivery.getRemoteState();
+ if( state==null || state instanceof Accepted) {
+ if( !delivery.remotelySettled() ) {
+ delivery.disposition(new Accepted());
+ }
+ } else if( state instanceof Rejected) {
+ // re-deliver /w incremented delivery counter.
+ md.delivery = null;
+ md.incrementDeliveryCount();
+ outbound.addLast(md);
+ } else if( state instanceof Released) {
+ // re-deliver && don't increment the counter.
+ md.delivery = null;
+ outbound.addLast(md);
+ } else if( state instanceof Modified) {
+ Modified modified = (Modified) state;
+ if ( modified.getDeliveryFailed() ) {
+ // increment delivery counter..
+ md.incrementDeliveryCount();
+ }
+ }
+ delivery.settle();
+ }
+ md.fireWatches();
+ }
+
+ byte[] nextTag() {
+ byte[] rc;
+ if (tagCache != null && !tagCache.isEmpty()) {
+ final Iterator<byte[]> iterator = tagCache.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ try {
+ rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return rc;
+ }
+
+ void checkinTag(byte[] data) {
+ if( tagCache.size() < 1024 ) {
+ tagCache.add(data);
+ }
+ }
+
+ public void onOverflowBufferDrained(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (outboundBufferSize==0) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+import org.apache.qpid.proton.type.transport.SenderSettleMode;
+
+import java.util.UUID;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSession extends AmqpEndpointBase {
+
+ final AmqpConnection parent;
+ final SessionImpl session;
+
+
+ public AmqpSession(AmqpConnection parent, SessionImpl session) {
+ this.parent = parent;
+ this.session = session;
+ attach();
+ }
+
+ @Override
+ protected Endpoint getEndpoint() {
+ return session;
+ }
+
+ @Override
+ protected AmqpConnection getParent() {
+ return parent;
+ }
+
+ public AmqpSender createSender(Target target) {
+ return createSender(target, QoS.AT_LEAST_ONCE);
+ }
+
+ public AmqpSender createSender(Target target, QoS qos) {
+ return createSender(target, qos, UUID.randomUUID().toString());
+ }
+
+ public AmqpSender createSender(Target target, QoS qos, String name) {
+ assertExecuting();
+ SenderImpl sender = session.sender(name);
+ attach();
+// Source source = new Source();
+// source.setAddress(UUID.randomUUID().toString());
+// sender.setSource(source);
+ sender.setTarget(target);
+ configureQos(sender, qos);
+ sender.open();
+ pumpOut();
+ return new AmqpSender(this, sender, qos);
+ }
+
+ public AmqpReceiver createReceiver(Source source) {
+ return createReceiver(source, QoS.AT_LEAST_ONCE);
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos) {
+ return createReceiver(source, qos, 100);
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
+ return createReceiver(source, qos, prefetch, UUID.randomUUID().toString());
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
+ assertExecuting();
+ ReceiverImpl receiver = session.receiver(name);
+ receiver.setSource(source);
+// Target target = new Target();
+// target.setAddress(UUID.randomUUID().toString());
+// receiver.setTarget(target);
+ receiver.flow(prefetch);
+ configureQos(receiver, qos);
+ receiver.open();
+ pumpOut();
+ return new AmqpReceiver(this, receiver, qos);
+ }
+
+ private void configureQos(Link link, QoS qos) {
+ switch (qos) {
+ case AT_MOST_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.SETTLED);
+ link.setReceiverSettleMode(SenderSettleMode.UNSETTLED);
+ break;
+ case AT_LEAST_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ link.setReceiverSettleMode(SenderSettleMode.SETTLED);
+ break;
+ case EXACTLY_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ link.setReceiverSettleMode(SenderSettleMode.MIXED);
+ break;
+ }
+ }
+
+ public Message createTextMessage(String value) {
+ Message msg = new Message();
+ Section body = new AmqpValue(value);
+ msg.setBody(body);
+ return msg;
+ }
+
+ public Message createBinaryMessage(byte value[]) {
+ return createBinaryMessage(value, 0, value.length);
+ }
+
+ public Message createBinaryMessage(byte value[], int offset, int len) {
+ Message msg = new Message();
+ Data body = new Data(new Binary(value, offset,len));
+ msg.setBody(body);
+ return msg;
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Callback<T> {
+ public void onSuccess(T value);
+ public void onFailure(Throwable value);
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+ public final Callback<Out> next;
+
+ public ChainedCallback(Callback<Out> next) {
+ this.next = next;
+ }
+
+ public void onFailure(Throwable value) {
+ next.onFailure(value);
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+ abstract void processDelivery(Delivery delivery);
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+ T await() throws Exception;
+ T await(long amount, TimeUnit unit) throws Exception;
+ void then(Callback<T> callback);
+
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.DroppingWritableBuffer;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+ final int initialSize;
+ private Message message;
+ private Buffer encoded;
+ public DeliveryImpl delivery;
+ private int sizeHint = 1024*4;
+
+ static Buffer encode(Message message, int sizeHint) {
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[sizeHint]);
+ DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+ int c = message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+ if( overflow.position() > 0 ) {
+ buffer = ByteBuffer.wrap(new byte[sizeHint+overflow.position()]);
+ c = message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+ }
+ return new Buffer(buffer.array(), 0, c);
+ }
+
+ static Message decode(Buffer buffer) {
+ Message msg = new Message();
+ int offset = buffer.offset;
+ int len = buffer.length;
+ while( len > 0 ) {
+ int decoded = msg.decode(buffer.data, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+ return msg;
+ }
+
+ public MessageDelivery(Message message) {
+ this(message, encode(message, 1024*4));
+ }
+
+ public MessageDelivery(Buffer encoded) {
+ this(null, encoded);
+ }
+
+ public MessageDelivery(Message message, Buffer encoded) {
+ this.message = message;
+ this.encoded = encoded;
+ sizeHint = this.encoded.length;
+ initialSize = sizeHint;
+ }
+
+ public Message getMessage() {
+ if( message == null ) {
+ message = decode(encoded);
+ }
+ return message;
+ }
+
+ public Buffer encoded() {
+ if( encoded == null ) {
+ encoded = encode(message, sizeHint);
+ sizeHint = encoded.length;
+ }
+ return encoded;
+ }
+
+ public boolean isSettled() {
+ return delivery!=null && delivery.isSettled();
+ }
+
+ public DeliveryState getRemoteState() {
+ return delivery==null ? null : delivery.getRemoteState();
+ }
+
+ public DeliveryState getLocalState() {
+ return delivery==null ? null : delivery.getLocalState();
+ }
+
+ public void onEncoded(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state when it changes.
+ * @throws Exception
+ */
+ public DeliveryState getRemoteStateChange() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getRemoteStateChangeFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state when it changes.
+ */
+ public Future<DeliveryState> getRemoteStateChangeFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteStateChange(rc);
+ }
+ });
+ return rc;
+ }
+
+ abstract AmqpLink link();
+
+ boolean watchingRemoteStateChange;
+ public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+ watchingRemoteStateChange = true;
+ final DeliveryState original = delivery.getRemoteState();
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (original == null) {
+ if( delivery.getRemoteState()!=null ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ } else {
+ if( !original.equals(delivery.getRemoteState()) ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state once settled.
+ * @throws Exception
+ */
+ public DeliveryState getSettle() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getSettleFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state once the delivery is settled.
+ */
+ public Future<DeliveryState> getSettleFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onSettle(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onSettle(final Callback<DeliveryState> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null && delivery.isSettled() ) {
+ cb.onSuccess(delivery.getRemoteState());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ @Override
+ protected void fireWatches() {
+ super.fireWatches();
+ }
+
+ void incrementDeliveryCount() {
+ Message msg = getMessage();
+ msg.setDeliveryCount(msg.getDeliveryCount()+1);
+ encoded = null;
+ }
+
+ public void redeliver(boolean incrementDeliveryCounter) {
+ if( incrementDeliveryCounter ) {
+ incrementDeliveryCount();
+ }
+ }
+
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.settle();
+ }
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+ Callback<T> next;
+ Throwable error;
+ T value;
+
+ public void onFailure(Throwable value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ error = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onFailure(value);
+ }
+ }
+
+ public void onSuccess(T value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ this.value = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onSuccess(value);
+ }
+ }
+
+ public void then(Callback<T> callback) {
+ boolean fire = false;
+ synchronized(this) {
+ next = callback;
+ if( latch.getCount() == 0 ) {
+ fire = true;
+ }
+ }
+ if( fire ) {
+ if( error!=null ) {
+ callback.onFailure(error);
+ } else {
+ callback.onSuccess(value);
+ }
+ }
+ }
+
+ public T await(long amount, TimeUnit unit) throws Exception {
+ if( latch.await(amount, unit) ) {
+ return get();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ public T await() throws Exception {
+ latch.await();
+ return get();
+ }
+
+ private T get() throws Exception {
+ Throwable e = error;
+ if( e !=null ) {
+ if( e instanceof RuntimeException ) {
+ throw (RuntimeException) e;
+ } else if( e instanceof Exception) {
+ throw (Exception) e;
+ } else if( e instanceof Error) {
+ throw (Error) e;
+ } else {
+ // don't expect to hit this case.
+ throw new RuntimeException(e);
+ }
+ }
+ return value;
+ }
+
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+ AT_MOST_ONCE,
+ AT_LEAST_ONCE,
+ EXACTLY_ONCE
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum TransportState {
+ CREATED,
+ CONNECTING,
+ CONNECTED,
+ DISCONNECTING,
+ DISCONNECTED
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+ static final Buffer PREFIX = new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P'
+ });
+
+ private Buffer buffer;
+
+ public AmqpHeader(){
+ this(new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P', 0, 1, 0, 0
+ }));
+ }
+
+ public AmqpHeader(Buffer buffer){
+ setBuffer(buffer);
+ }
+
+ public int getProtocolId() {
+ return buffer.get(4) & 0xFF;
+ }
+ public void setProtocolId(int value) {
+ buffer.data[buffer.offset+4] = (byte) value;
+ }
+
+ public int getMajor() {
+ return buffer.get(5) & 0xFF;
+ }
+ public void setMajor(int value) {
+ buffer.data[buffer.offset+5] = (byte) value;
+ }
+
+ public int getMinor() {
+ return buffer.get(6) & 0xFF;
+ }
+ public void setMinor(int value) {
+ buffer.data[buffer.offset+6] = (byte) value;
+ }
+
+ public int getRevision() {
+ return buffer.get(7) & 0xFF;
+ }
+ public void setRevision(int value) {
+ buffer.data[buffer.offset+7] = (byte) value;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+ public void setBuffer(Buffer value) {
+ if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+ throw new IllegalArgumentException("Not an AMQP header buffer");
+ }
+ buffer = value.buffer();
+ }
+
+
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.EndpointImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+ public Sasl processSaslConnect(TransportImpl transport) {
+ return null;
+ }
+
+ public Sasl processSaslEvent(Sasl sasl) {
+ return sasl;
+ }
+
+ public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
+ ((EndpointImpl)endpoint).setLocalError(new EndpointError("error", "Not supported"));
+ endpoint.close();
+ onComplete.run();
+ }
+
+ public void processRemoteClose(Endpoint endpoint, Task onComplete) {
+ endpoint.close();
+ onComplete.run();
+ }
+
+ public void processDelivery(Delivery delivery){
+ }
+
+ public void processTransportConnected() {
+ }
+
+ public void processTransportFailure(IOException e) {
+ this.processFailure(e);
+ }
+
+ public void processFailure(Throwable e) {
+ e.printStackTrace();
+ }
+
+ public void processRefill() {
+ }
+
+}
Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+ int maxFrameSize = 4*1024*1024;
+
+ @Override
+ protected void encode(Object object) throws IOException {
+ nextWriteBuffer.write((Buffer) object);
+ }
+
+ @Override
+ protected Action initialDecodeAction() {
+ return new Action() {
+ public Object apply() throws IOException {
+ Buffer magic = readBytes(8);
+ if (magic != null) {
+ nextDecodeAction = readFrameSize;
+ return new AmqpHeader(magic);
+ } else {
+ return null;
+ }
+ }
+ };
+ }
+
+ private final Action readFrameSize = new Action() {
+ public Object apply() throws IOException {
+ Buffer sizeBytes = peekBytes(4);
+ if (sizeBytes != null) {
+ int size = sizeBytes.bigEndianEditor().readInt();
+ if (size < 8) {
+ throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
+ }
+ if( size > maxFrameSize ) {
+ throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
+ }
+
+ // TODO: check frame min and max size..
+ nextDecodeAction = readFrame(size);
+ return nextDecodeAction.apply();
+ } else {
+ return null;
+ }
+ }
+ };
+
+
+ private final Action readFrame(final int size) {
+ return new Action() {
+ public Object apply() throws IOException {
+ Buffer frameData = readBytes(size);
+ if (frameData != null) {
+ nextDecodeAction = readFrameSize;
+ return frameData;
+ } else {
+ return null;
+ }
+ }
+ };
+ }
+
+ public int getReadBytesPendingDecode() {
+ return readBuffer.position() - readStart;
+ }
+
+ public void skipProtocolHeader() {
+ nextDecodeAction = readFrameSize;
+ }
+
+ public void readProtocolHeader() {
+ nextDecodeAction = initialDecodeAction();
+ }
+
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org