You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/05/03 19:47:27 UTC
[3/3] qpid-proton git commit: PROTON-1188,
PROTON-1189: remove stale contrib/proton-jms and
contrib/proton-hawtdispatch modules
PROTON-1188, PROTON-1189: remove stale contrib/proton-jms and contrib/proton-hawtdispatch modules
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/564a4d02
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/564a4d02
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/564a4d02
Branch: refs/heads/master
Commit: 564a4d024a42df7911b87a3ae7a3cdc43ac8194f
Parents: 99a9056
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue May 3 18:44:59 2016 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue May 3 18:46:48 2016 +0100
----------------------------------------------------------------------
contrib/proton-hawtdispatch/pom.xml | 61 --
.../hawtdispatch/api/AmqpConnectOptions.java | 228 --------
.../proton/hawtdispatch/api/AmqpConnection.java | 201 -------
.../hawtdispatch/api/AmqpDeliveryListener.java | 32 -
.../hawtdispatch/api/AmqpEndpointBase.java | 158 -----
.../qpid/proton/hawtdispatch/api/AmqpLink.java | 27 -
.../proton/hawtdispatch/api/AmqpReceiver.java | 141 -----
.../proton/hawtdispatch/api/AmqpSender.java | 227 -------
.../proton/hawtdispatch/api/AmqpSession.java | 141 -----
.../qpid/proton/hawtdispatch/api/Callback.java | 29 -
.../hawtdispatch/api/ChainedCallback.java | 37 --
.../hawtdispatch/api/DeliveryAttachment.java | 27 -
.../qpid/proton/hawtdispatch/api/Future.java | 31 -
.../hawtdispatch/api/MessageDelivery.java | 226 -------
.../qpid/proton/hawtdispatch/api/Promise.java | 107 ----
.../qpid/proton/hawtdispatch/api/QoS.java | 26 -
.../proton/hawtdispatch/api/TransportState.java | 29 -
.../proton/hawtdispatch/impl/AmqpHeader.java | 85 ---
.../proton/hawtdispatch/impl/AmqpListener.java | 71 ---
.../hawtdispatch/impl/AmqpProtocolCodec.java | 109 ----
.../proton/hawtdispatch/impl/AmqpTransport.java | 586 -------------------
.../qpid/proton/hawtdispatch/impl/Defer.java | 27 -
.../hawtdispatch/impl/EndpointContext.java | 76 ---
.../qpid/proton/hawtdispatch/impl/Support.java | 41 --
.../qpid/proton/hawtdispatch/impl/Watch.java | 26 -
.../proton/hawtdispatch/impl/WatchBase.java | 54 --
.../proton/hawtdispatch/api/SampleTest.java | 308 ----------
.../hawtdispatch/test/MessengerServer.java | 151 -----
contrib/proton-jms/pom.xml | 50 --
.../jms/AMQPNativeInboundTransformer.java | 40 --
.../jms/AMQPNativeOutboundTransformer.java | 111 ----
.../proton/jms/AMQPRawInboundTransformer.java | 47 --
.../proton/jms/AutoOutboundTransformer.java | 49 --
.../apache/qpid/proton/jms/EncodedMessage.java | 75 ---
.../qpid/proton/jms/InboundTransformer.java | 314 ----------
.../jms/JMSMappingInboundTransformer.java | 102 ----
.../jms/JMSMappingOutboundTransformer.java | 243 --------
.../org/apache/qpid/proton/jms/JMSVendor.java | 61 --
.../qpid/proton/jms/OutboundTransformer.java | 82 ---
.../jms/JMSMappingInboundTransformerTest.java | 214 -------
.../jms/JMSMappingOutboundTransformerTest.java | 226 -------
pom.xml | 2 -
42 files changed, 4878 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/pom.xml b/contrib/proton-hawtdispatch/pom.xml
deleted file mode 100644
index bffcad4..0000000
--- a/contrib/proton-hawtdispatch/pom.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-project</artifactId>
- <version>0.13.0-SNAPSHOT</version>
- <relativePath>../..</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>proton-hawtdispatch</artifactId>
- <name>proton-hawtdispatch</name>
-
- <properties>
- <hawtbuf-version>1.9</hawtbuf-version>
- <hawtdispatch-version>1.18</hawtdispatch-version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-j</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.fusesource.hawtdispatch</groupId>
- <artifactId>hawtdispatch-transport</artifactId>
- <version>${hawtdispatch-version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf</artifactId>
- <version>${hawtbuf-version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- </build>
- <scm>
- <url>http://svn.apache.org/viewvc/qpid/proton/</url>
- </scm>
-
-</project>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
deleted file mode 100644
index 3c3543d..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.transport.TcpTransport;
-
-import javax.net.ssl.SSLContext;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.*;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpConnectOptions implements Cloneable {
-
- private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
- private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
- private static ThreadPoolExecutor blockingThreadPool;
-
- public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
- if( blockingThreadPool == null ) {
- blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
- rc.setDaemon(true);
- return rc;
- }
- }) {
-
- @Override
- public void shutdown() {
- // we don't ever shutdown since we are shared..
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- // we don't ever shutdown since we are shared..
- return Collections.emptyList();
- }
- };
- }
- return blockingThreadPool;
- }
- public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
- blockingThreadPool = pool;
- }
-
- private static final URI DEFAULT_HOST;
- static {
- try {
- DEFAULT_HOST = new URI("tcp://localhost");
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
-
- URI host = DEFAULT_HOST;
- URI localAddress;
- SSLContext sslContext;
- DispatchQueue dispatchQueue;
- Executor blockingExecutor;
- int maxReadRate;
- int maxWriteRate;
- int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
- boolean useLocalHost;
- int receiveBufferSize = 1024*64;
- int sendBufferSize = 1024*64;
- String localContainerId;
- String remoteContainerId;
- String user;
- String password;
-
-
- @Override
- public AmqpConnectOptions clone() {
- try {
- return (AmqpConnectOptions) super.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- }
-
- public String getLocalContainerId() {
- return localContainerId;
- }
-
- public void setLocalContainerId(String localContainerId) {
- this.localContainerId = localContainerId;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getRemoteContainerId() {
- return remoteContainerId;
- }
-
- public void setRemoteContainerId(String remoteContainerId) {
- this.remoteContainerId = remoteContainerId;
- }
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-
- public Executor getBlockingExecutor() {
- return blockingExecutor;
- }
-
- public void setBlockingExecutor(Executor blockingExecutor) {
- this.blockingExecutor = blockingExecutor;
- }
-
- public DispatchQueue getDispatchQueue() {
- return dispatchQueue;
- }
-
- public void setDispatchQueue(DispatchQueue dispatchQueue) {
- this.dispatchQueue = dispatchQueue;
- }
-
- public URI getLocalAddress() {
- return localAddress;
- }
-
- public void setLocalAddress(String localAddress) throws URISyntaxException {
- this.setLocalAddress(new URI(localAddress));
- }
- public void setLocalAddress(URI localAddress) {
- this.localAddress = localAddress;
- }
-
- public int getMaxReadRate() {
- return maxReadRate;
- }
-
- public void setMaxReadRate(int maxReadRate) {
- this.maxReadRate = maxReadRate;
- }
-
- public int getMaxWriteRate() {
- return maxWriteRate;
- }
-
- public void setMaxWriteRate(int maxWriteRate) {
- this.maxWriteRate = maxWriteRate;
- }
-
- public int getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize(int receiveBufferSize) {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public URI getHost() {
- return host;
- }
- public void setHost(String host, int port) throws URISyntaxException {
- this.setHost(new URI("tcp://"+host+":"+port));
- }
- public void setHost(String host) throws URISyntaxException {
- this.setHost(new URI(host));
- }
- public void setHost(URI host) {
- this.host = host;
- }
-
- public int getSendBufferSize() {
- return sendBufferSize;
- }
-
- public void setSendBufferSize(int sendBufferSize) {
- this.sendBufferSize = sendBufferSize;
- }
-
- public SSLContext getSslContext() {
- return sslContext;
- }
-
- public void setSslContext(SSLContext sslContext) {
- this.sslContext = sslContext;
- }
-
- public int getTrafficClass() {
- return trafficClass;
- }
-
- public void setTrafficClass(int trafficClass) {
- this.trafficClass = trafficClass;
- }
-
- public boolean isUseLocalHost() {
- return useLocalHost;
- }
-
- public void setUseLocalHost(boolean useLocalHost) {
- this.useLocalHost = useLocalHost;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
deleted file mode 100644
index b308209..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
-import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.ProtonJConnection;
-import org.apache.qpid.proton.engine.ProtonJSession;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpConnection extends AmqpEndpointBase {
-
- AmqpTransport transport;
- ProtonJConnection connection;
- HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
- boolean closing = false;
-
- public static AmqpConnection connect(AmqpConnectOptions options) {
- return new AmqpConnection(options);
- }
-
- private AmqpConnection(AmqpConnectOptions options) {
- transport = AmqpTransport.connect(options);
- transport.setListener(new AmqpListener() {
- @Override
- public void processDelivery(Delivery delivery) {
- Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
- AmqpLink link = (AmqpLink) attachment.endpoint();
- link.processDelivery(delivery);
- }
-
- @Override
- public void processRefill() {
- for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
- sender.pumpDeliveries();
- }
- pumpOut();
- }
-
- public void processTransportFailure(final IOException e) {
- }
- });
- connection = transport.connection();
- connection.open();
- attach();
- }
-
- public void waitForConnected() throws Exception {
- assertNotOnDispatchQueue();
- getConnectedFuture().await();
- }
-
- public Future<Void> getConnectedFuture() {
- final Promise<Void> rc = new Promise<Void>();
- queue().execute(new Task() {
- @Override
- public void run() {
- onConnected(rc);
- }
- });
- return rc;
- }
-
- public void onConnected(Callback<Void> cb) {
- transport.onTransportConnected(cb);
- }
-
- @Override
- protected Endpoint getEndpoint() {
- return connection;
- }
-
- @Override
- protected AmqpConnection getConnection() {
- return this;
- }
-
- @Override
- protected AmqpEndpointBase getParent() {
- return null;
- }
-
- public AmqpSession createSession() {
- assertExecuting();
- ProtonJSession session = connection.session();
- session.open();
- pumpOut();
- return new AmqpSession(this, session);
- }
-
- public int getMaxSessions() {
- return connection.getMaxChannels();
- }
-
- public void disconnect() {
- closing = true;
- transport.disconnect();
- }
-
- public void waitForDisconnected() throws Exception {
- assertNotOnDispatchQueue();
- getDisconnectedFuture().await();
- }
-
- public Future<Void> getDisconnectedFuture() {
- final Promise<Void> rc = new Promise<Void>();
- queue().execute(new Task() {
- @Override
- public void run() {
- onDisconnected(rc);
- }
- });
- return rc;
- }
-
- public void onDisconnected(Callback<Void> cb) {
- transport.onTransportDisconnected(cb);
- }
-
- public TransportState getTransportState() {
- return transport.getState();
- }
-
- public Throwable getTransportFailure() {
- return transport.getFailure();
- }
-
- public Future<Throwable> getTransportFailureFuture() {
- final Promise<Throwable> rc = new Promise<Throwable>();
- queue().execute(new Task() {
- @Override
- public void run() {
- onTransportFailure(rc);
- }
- });
- return rc;
- }
-
- public void onTransportFailure(Callback<Throwable> cb) {
- transport.onTransportFailure(cb);
- }
-
- @Override
- public DispatchQueue queue() {
- return super.queue();
- }
-
- public void setProtocolTracer(ProtocolTracer protocolTracer) {
- transport.setProtocolTracer(protocolTracer);
- }
-
- public ProtocolTracer getProtocolTracer() {
- return transport.getProtocolTracer();
- }
-
- /**
- * Once the remote end, closes the transport is disconnected.
- */
- @Override
- public void close() {
- super.close();
- onRemoteClose(new Callback<ErrorCondition>() {
- @Override
- public void onSuccess(ErrorCondition value) {
- disconnect();
- }
-
- @Override
- public void onFailure(Throwable value) {
- disconnect();
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
deleted file mode 100644
index 1e9f4e2..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface AmqpDeliveryListener {
-
- /**
- * Caller should suspend/resume the AmqpReceiver to
- * flow control the delivery of messages.
- *
- * @param delivery
- */
- void onMessageDelivery(MessageDelivery delivery);
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
deleted file mode 100644
index 4ebd8e2..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.hawtdispatch.impl.*;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.Task;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract class AmqpEndpointBase extends WatchBase {
- abstract protected Endpoint getEndpoint();
- abstract protected AmqpEndpointBase getParent();
-
- protected AmqpConnection getConnection() {
- return getParent().getConnection();
- }
-
- protected AmqpTransport getTransport() {
- return getConnection().transport;
- }
-
- protected DispatchQueue queue() {
- return getTransport().queue();
- }
-
- protected void assertExecuting() {
- getTransport().assertExecuting();
- }
-
- public void waitForRemoteOpen() throws Exception {
- assertNotOnDispatchQueue();
- getRemoteOpenFuture().await();
- }
-
- public Future<Void> getRemoteOpenFuture() {
- final Promise<Void> rc = new Promise<Void>();
- queue().execute(new Task() {
- @Override
- public void run() {
- onRemoteOpen(rc);
- }
- });
- return rc;
- }
-
- public void onRemoteOpen(final Callback<Void> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- switch (getEndpoint().getRemoteState()) {
- case ACTIVE:
- cb.onSuccess(null);
- return true;
- case CLOSED:
- cb.onFailure(Support.illegalState("closed"));
- return true;
- }
- return false;
- }
- });
- }
-
- public ErrorCondition waitForRemoteClose() throws Exception {
- assertNotOnDispatchQueue();
- return getRemoteCloseFuture().await();
- }
-
- public Future<ErrorCondition> getRemoteCloseFuture() {
- final Promise<ErrorCondition> rc = new Promise<ErrorCondition>();
- queue().execute(new Task() {
- @Override
- public void run() {
- onRemoteClose(rc);
- }
- });
- return rc;
- }
-
- public void onRemoteClose(final Callback<ErrorCondition> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
- cb.onSuccess(getEndpoint().getRemoteCondition());
- return true;
- }
- return false;
- }
- });
- }
-
- public void close() {
- getEndpoint().close();
- pumpOut();
- }
-
- public EndpointState getRemoteState() {
- return getEndpoint().getRemoteState();
- }
-
- public ErrorCondition getRemoteError() {
- return getEndpoint().getRemoteCondition();
- }
-
- static protected ErrorCondition toError(Throwable value) {
- return new ErrorCondition(Symbol.valueOf("error"), value.toString());
- }
-
- class Attachment extends Task {
- AmqpEndpointBase endpoint() {
- return AmqpEndpointBase.this;
- }
-
- @Override
- public void run() {
- fireWatches();
- }
- }
-
- protected void attach() {
- getTransport().context(getEndpoint()).setAttachment(new Attachment());
- }
-
- protected void defer(Defer defer) {
- getTransport().defer(defer);
- }
-
- protected void pumpOut() {
- getTransport().pumpOut();
- }
-
- static protected void assertNotOnDispatchQueue() {
- assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
deleted file mode 100644
index dd6f32e..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.engine.Delivery;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class AmqpLink extends AmqpEndpointBase {
- abstract protected void processDelivery(Delivery delivery);
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
deleted file mode 100644
index 644f72a..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.hawtdispatch.impl.Defer;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.ByteArrayOutputStream;
-
-import java.util.LinkedList;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpReceiver extends AmqpLink {
-
- final AmqpSession parent;
- final Receiver receiver;
-
- public AmqpReceiver(AmqpSession parent, Receiver receiver2, QoS qos) {
- this.parent = parent;
- this.receiver = receiver2;
- attach();
- }
-
- @Override
- protected Receiver getEndpoint() {
- return receiver;
- }
- @Override
- protected AmqpSession getParent() {
- return parent;
- }
-
- ByteArrayOutputStream current = new ByteArrayOutputStream();
-
- @Override
- protected void processDelivery(Delivery delivery) {
- if( !delivery.isReadable() ) {
- System.out.println("it was not readable!");
- return;
- }
-
- if( current==null ) {
- current = new ByteArrayOutputStream();
- }
-
- int count;
- byte data[] = new byte[1024*4];
- while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
- current.write(data, 0, count);
- }
-
- // Expecting more deliveries..
- if( count == 0 ) {
- return;
- }
-
- receiver.advance();
- Buffer buffer = current.toBuffer();
- current = null;
- onMessage(delivery, buffer);
-
- }
-
- LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
-
- protected void onMessage(Delivery delivery, Buffer buffer) {
- MessageDelivery md = new MessageDelivery(buffer) {
- @Override
- AmqpLink link() {
- return AmqpReceiver.this;
- }
-
- @Override
- public void settle() {
- if( !delivery.isSettled() ) {
- delivery.disposition(new Accepted());
- delivery.settle();
- }
- drain();
- }
- };
- md.delivery = delivery;
- delivery.setContext(md);
- inbound.add(md);
- drainInbound();
- }
-
- public void drain() {
- defer(deferedDrain);
- }
-
- Defer deferedDrain = new Defer(){
- public void run() {
- drainInbound();
- }
- };
- int resumed = 0;
-
- public void resume() {
- resumed++;
- }
- public void suspend() {
- resumed--;
- }
-
- AmqpDeliveryListener deliveryListener;
- private void drainInbound() {
- while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
- deliveryListener.onMessageDelivery(inbound.removeFirst());
- receiver.flow(1);
- }
- }
-
- public AmqpDeliveryListener getDeliveryListener() {
- return deliveryListener;
- }
-
- public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
- this.deliveryListener = deliveryListener;
- drainInbound();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
deleted file mode 100644
index 9a672d5..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.hawtdispatch.impl.Defer;
-import org.apache.qpid.proton.hawtdispatch.impl.Watch;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.fusesource.hawtbuf.Buffer;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpSender extends AmqpLink {
-
- private byte[] EMPTY_BYTE_ARRAY = new byte[]{};
- long nextTagId = 0;
- HashSet<byte[]> tagCache = new HashSet<byte[]>();
-
- final AmqpSession parent;
- private final QoS qos;
- final Sender sender;
-
- public AmqpSender(AmqpSession parent, Sender sender2, QoS qos) {
- this.parent = parent;
- this.sender = sender2;
- this.qos = qos;
- attach();
- getConnection().senders.add(this);
- }
-
- @Override
- public void close() {
- super.close();
- getConnection().senders.remove(this);
- }
-
- @Override
- protected Sender getEndpoint() {
- return sender;
- }
-
- @Override
- protected AmqpSession getParent() {
- return parent;
- }
-
- final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
- long outboundBufferSize;
-
- public MessageDelivery send(Message message) {
- assertExecuting();
- MessageDelivery rc = new MessageDelivery(message) {
- @Override
- AmqpLink link() {
- return AmqpSender.this;
- }
-
- @Override
- public void redeliver(boolean incrementDeliveryCounter) {
- super.redeliver(incrementDeliveryCounter);
- outbound.add(this);
- outboundBufferSize += initialSize;
- defer(deferedPumpDeliveries);
- }
- };
- outbound.add(rc);
- outboundBufferSize += rc.initialSize;
- pumpDeliveries();
- pumpOut();
- return rc;
- }
-
- Buffer currentBuffer;
- Delivery currentDelivery;
-
- Defer deferedPumpDeliveries = new Defer() {
- public void run() {
- pumpDeliveries();
- }
- };
-
- public long getOverflowBufferSize() {
- return outboundBufferSize;
- }
-
- protected void pumpDeliveries() {
- assertExecuting();
- try {
- while(true) {
- while( currentBuffer !=null ) {
- if( sender.getCredit() > 0 ) {
- int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
- currentBuffer.moveHead(sent);
- if( currentBuffer.length == 0 ) {
- Delivery current = currentDelivery;
- MessageDelivery md = (MessageDelivery) current.getContext();
- currentBuffer = null;
- currentDelivery = null;
- if( qos == QoS.AT_MOST_ONCE ) {
- current.settle();
- } else {
- sender.advance();
- }
- md.fireWatches();
- }
- } else {
- return;
- }
- }
-
- if( outbound.isEmpty() ) {
- return;
- }
-
- final MessageDelivery md = outbound.removeFirst();
- outboundBufferSize -= md.initialSize;
- currentBuffer = md.encoded();
- if( qos == QoS.AT_MOST_ONCE ) {
- currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
- } else {
- final byte[] tag = nextTag();
- currentDelivery = sender.delivery(tag, 0, tag.length);
- }
- md.delivery = currentDelivery;
- currentDelivery.setContext(md);
- }
- } finally {
- fireWatches();
- }
- }
-
- @Override
- protected void processDelivery(Delivery delivery) {
- final MessageDelivery md = (MessageDelivery) delivery.getContext();
- if( delivery.remotelySettled() ) {
- if( delivery.getTag().length > 0 ) {
- checkinTag(delivery.getTag());
- }
-
- final DeliveryState state = delivery.getRemoteState();
- if( state==null || state instanceof Accepted) {
- if( !delivery.remotelySettled() ) {
- delivery.disposition(new Accepted());
- }
- } else if( state instanceof Rejected) {
- // re-deliver /w incremented delivery counter.
- md.delivery = null;
- md.incrementDeliveryCount();
- outbound.addLast(md);
- } else if( state instanceof Released) {
- // re-deliver && don't increment the counter.
- md.delivery = null;
- outbound.addLast(md);
- } else if( state instanceof Modified) {
- Modified modified = (Modified) state;
- if ( modified.getDeliveryFailed() ) {
- // increment delivery counter..
- md.incrementDeliveryCount();
- }
- }
- delivery.settle();
- }
- md.fireWatches();
- }
-
- byte[] nextTag() {
- byte[] rc;
- if (tagCache != null && !tagCache.isEmpty()) {
- final Iterator<byte[]> iterator = tagCache.iterator();
- rc = iterator.next();
- iterator.remove();
- } else {
- try {
- rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- return rc;
- }
-
- void checkinTag(byte[] data) {
- if( tagCache.size() < 1024 ) {
- tagCache.add(data);
- }
- }
-
- public void onOverflowBufferDrained(final Callback<Void> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if (outboundBufferSize==0) {
- cb.onSuccess(null);
- return true;
- }
- return false;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
deleted file mode 100644
index b25a1b7..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.ProtonJSession;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.*;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-
-import java.util.UUID;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpSession extends AmqpEndpointBase {
-
- final AmqpConnection parent;
- final ProtonJSession session;
-
-
- public AmqpSession(AmqpConnection parent, ProtonJSession session) {
- this.parent = parent;
- this.session = session;
- attach();
- }
-
- @Override
- protected Endpoint getEndpoint() {
- return session;
- }
-
- @Override
- protected AmqpConnection getParent() {
- return parent;
- }
-
- public AmqpSender createSender(Target target) {
- return createSender(target, QoS.AT_LEAST_ONCE);
- }
-
- public AmqpSender createSender(Target target, QoS qos) {
- return createSender(target, qos, UUID.randomUUID().toString());
- }
-
- public AmqpSender createSender(Target target, QoS qos, String name) {
- assertExecuting();
- Sender sender = session.sender(name);
- attach();
-// Source source = new Source();
-// source.setAddress(UUID.randomUUID().toString());
-// sender.setSource(source);
- sender.setTarget(target);
- configureQos(sender, qos);
- sender.open();
- pumpOut();
- return new AmqpSender(this, sender, qos);
- }
-
- public AmqpReceiver createReceiver(Source source) {
- return createReceiver(source, QoS.AT_LEAST_ONCE);
- }
-
- public AmqpReceiver createReceiver(Source source, QoS qos) {
- return createReceiver(source, qos, 100);
- }
-
- public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
- return createReceiver(source, qos, prefetch, UUID.randomUUID().toString());
- }
-
- public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
- assertExecuting();
- Receiver receiver = session.receiver(name);
- receiver.setSource(source);
-// Target target = new Target();
-// target.setAddress(UUID.randomUUID().toString());
-// receiver.setTarget(target);
- receiver.flow(prefetch);
- configureQos(receiver, qos);
- receiver.open();
- pumpOut();
- return new AmqpReceiver(this, receiver, qos);
- }
-
- private void configureQos(Link link, QoS qos) {
- switch (qos) {
- case AT_MOST_ONCE:
- link.setSenderSettleMode(SenderSettleMode.SETTLED);
- link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- break;
- case AT_LEAST_ONCE:
- link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- break;
- case EXACTLY_ONCE:
- link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- link.setReceiverSettleMode(ReceiverSettleMode.SECOND);
- break;
- }
- }
-
- public Message createTextMessage(String value) {
- Message msg = Message.Factory.create();
- Section body = new AmqpValue(value);
- msg.setBody(body);
- return msg;
- }
-
- public Message createBinaryMessage(byte value[]) {
- return createBinaryMessage(value, 0, value.length);
- }
-
- public Message createBinaryMessage(byte value[], int offset, int len) {
- Message msg = Message.Factory.create();
- Data body = new Data(new Binary(value, offset,len));
- msg.setBody(body);
- return msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
deleted file mode 100644
index 89fbdd1..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * <p>
- * Function Result that carries one value.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface Callback<T> {
- public void onSuccess(T value);
- public void onFailure(Throwable value);
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
deleted file mode 100644
index e53f512..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * <p>
- * Function Result that carries one value.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class ChainedCallback<In,Out> implements Callback<In> {
-
- public final Callback<Out> next;
-
- public ChainedCallback(Callback<Out> next) {
- this.next = next;
- }
-
- public void onFailure(Throwable value) {
- next.onFailure(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
deleted file mode 100644
index 290076f..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.engine.Delivery;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class DeliveryAttachment {
- abstract void processDelivery(Delivery delivery);
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
deleted file mode 100644
index 4a9eb5e..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * <p>A simplified Future function results interface.</p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface Future<T> {
- T await() throws Exception;
- T await(long amount, TimeUnit unit) throws Exception;
- void then(Callback<T> callback);
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
deleted file mode 100644
index b115557..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.hawtdispatch.impl.Watch;
-import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtdispatch.Task;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public abstract class MessageDelivery extends WatchBase {
-
- final int initialSize;
- private Message message;
- private Buffer encoded;
- public Delivery delivery;
- private int sizeHint = 32;
-
- static Buffer encode(Message message, int sizeHint) {
- byte[] buffer = new byte[sizeHint];
- int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint);
- if( size > sizeHint ) {
- buffer = new byte[size];
- size = message.encode(buffer, 0, size);
- }
- return new Buffer(buffer, 0, size);
- }
-
- static Message decode(Buffer buffer) {
- Message msg = Message.Factory.create();
- int offset = buffer.offset;
- int len = buffer.length;
- while( len > 0 ) {
- int decoded = msg.decode(buffer.data, offset, len);
- assert decoded > 0: "Make progress decoding the message";
- offset += decoded;
- len -= decoded;
- }
- return msg;
- }
-
- public MessageDelivery(Message message) {
- this(message, encode(message, 32));
- }
-
- public MessageDelivery(Buffer encoded) {
- this(null, encoded);
- }
-
- public MessageDelivery(Message message, Buffer encoded) {
- this.message = message;
- this.encoded = encoded;
- sizeHint = this.encoded.length;
- initialSize = sizeHint;
- }
-
- public Message getMessage() {
- if( message == null ) {
- message = decode(encoded);
- }
- return message;
- }
-
- public Buffer encoded() {
- if( encoded == null ) {
- encoded = encode(message, sizeHint);
- sizeHint = encoded.length;
- }
- return encoded;
- }
-
- public boolean isSettled() {
- return delivery!=null && delivery.isSettled();
- }
-
- public DeliveryState getRemoteState() {
- return delivery==null ? null : delivery.getRemoteState();
- }
-
- public DeliveryState getLocalState() {
- return delivery==null ? null : delivery.getLocalState();
- }
-
- public void onEncoded(final Callback<Void> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if( delivery!=null ) {
- cb.onSuccess(null);
- return true;
- }
- return false;
- }
- });
- }
-
- /**
- * @return the remote delivery state when it changes.
- * @throws Exception
- */
- public DeliveryState getRemoteStateChange() throws Exception {
- AmqpEndpointBase.assertNotOnDispatchQueue();
- return getRemoteStateChangeFuture().await();
- }
-
- /**
- * @return the future remote delivery state when it changes.
- */
- public Future<DeliveryState> getRemoteStateChangeFuture() {
- final Promise<DeliveryState> rc = new Promise<DeliveryState>();
- link().queue().execute(new Task() {
- @Override
- public void run() {
- onRemoteStateChange(rc);
- }
- });
- return rc;
- }
-
- abstract AmqpLink link();
-
- boolean watchingRemoteStateChange;
- public void onRemoteStateChange(final Callback<DeliveryState> cb) {
- watchingRemoteStateChange = true;
- final DeliveryState original = delivery.getRemoteState();
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if (original == null) {
- if( delivery.getRemoteState()!=null ) {
- cb.onSuccess(delivery.getRemoteState());
- watchingRemoteStateChange = false;
- return true;
- }
- } else {
- if( !original.equals(delivery.getRemoteState()) ) {
- cb.onSuccess(delivery.getRemoteState());
- watchingRemoteStateChange = false;
- return true;
- }
- }
- return false;
- }
- });
- }
-
- /**
- * @return the remote delivery state once settled.
- * @throws Exception
- */
- public DeliveryState getSettle() throws Exception {
- AmqpEndpointBase.assertNotOnDispatchQueue();
- return getSettleFuture().await();
- }
-
- /**
- * @return the future remote delivery state once the delivery is settled.
- */
- public Future<DeliveryState> getSettleFuture() {
- final Promise<DeliveryState> rc = new Promise<DeliveryState>();
- link().queue().execute(new Task() {
- @Override
- public void run() {
- onSettle(rc);
- }
- });
- return rc;
- }
-
- public void onSettle(final Callback<DeliveryState> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if( delivery!=null && delivery.isSettled() ) {
- cb.onSuccess(delivery.getRemoteState());
- return true;
- }
- return false;
- }
- });
- }
-
- @Override
- protected void fireWatches() {
- super.fireWatches();
- }
-
- void incrementDeliveryCount() {
- Message msg = getMessage();
- msg.setDeliveryCount(msg.getDeliveryCount()+1);
- encoded = null;
- }
-
- public void redeliver(boolean incrementDeliveryCounter) {
- if( incrementDeliveryCounter ) {
- incrementDeliveryCount();
- }
- }
-
- public void settle() {
- if( !delivery.isSettled() ) {
- delivery.settle();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
deleted file mode 100644
index b914b44..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class Promise<T> implements Callback<T>, Future<T> {
-
- private final CountDownLatch latch = new CountDownLatch(1);
- Callback<T> next;
- Throwable error;
- T value;
-
- public void onFailure(Throwable value) {
- Callback<T> callback = null;
- synchronized(this) {
- error = value;
- latch.countDown();
- callback = next;
- }
- if( callback!=null ) {
- callback.onFailure(value);
- }
- }
-
- public void onSuccess(T value) {
- Callback<T> callback = null;
- synchronized(this) {
- this.value = value;
- latch.countDown();
- callback = next;
- }
- if( callback!=null ) {
- callback.onSuccess(value);
- }
- }
-
- public void then(Callback<T> callback) {
- boolean fire = false;
- synchronized(this) {
- next = callback;
- if( latch.getCount() == 0 ) {
- fire = true;
- }
- }
- if( fire ) {
- if( error!=null ) {
- callback.onFailure(error);
- } else {
- callback.onSuccess(value);
- }
- }
- }
-
- public T await(long amount, TimeUnit unit) throws Exception {
- if( latch.await(amount, unit) ) {
- return get();
- } else {
- throw new TimeoutException();
- }
- }
-
- public T await() throws Exception {
- latch.await();
- return get();
- }
-
- private T get() throws Exception {
- Throwable e = error;
- if( e !=null ) {
- if( e instanceof RuntimeException ) {
- throw (RuntimeException) e;
- } else if( e instanceof Exception) {
- throw (Exception) e;
- } else if( e instanceof Error) {
- throw (Error) e;
- } else {
- // don't expect to hit this case.
- throw new RuntimeException(e);
- }
- }
- return value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
deleted file mode 100644
index 5b4a8dc..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public enum QoS {
- AT_MOST_ONCE,
- AT_LEAST_ONCE,
- EXACTLY_ONCE
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
deleted file mode 100644
index 4ebf21a..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.api;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public enum TransportState {
- CREATED,
- CONNECTING,
- CONNECTED,
- DISCONNECTING,
- DISCONNECTED
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
deleted file mode 100644
index de8a2cd..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtbuf.Buffer;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpHeader {
-
- static final Buffer PREFIX = new Buffer(new byte[]{
- 'A', 'M', 'Q', 'P'
- });
-
- private Buffer buffer;
-
- public AmqpHeader(){
- this(new Buffer(new byte[]{
- 'A', 'M', 'Q', 'P', 0, 1, 0, 0
- }));
- }
-
- public AmqpHeader(Buffer buffer){
- setBuffer(buffer);
- }
-
- public int getProtocolId() {
- return buffer.get(4) & 0xFF;
- }
- public void setProtocolId(int value) {
- buffer.data[buffer.offset+4] = (byte) value;
- }
-
- public int getMajor() {
- return buffer.get(5) & 0xFF;
- }
- public void setMajor(int value) {
- buffer.data[buffer.offset+5] = (byte) value;
- }
-
- public int getMinor() {
- return buffer.get(6) & 0xFF;
- }
- public void setMinor(int value) {
- buffer.data[buffer.offset+6] = (byte) value;
- }
-
- public int getRevision() {
- return buffer.get(7) & 0xFF;
- }
- public void setRevision(int value) {
- buffer.data[buffer.offset+7] = (byte) value;
- }
-
- public Buffer getBuffer() {
- return buffer;
- }
- public void setBuffer(Buffer value) {
- if( !value.startsWith(PREFIX) || value.length()!=8 ) {
- throw new IllegalArgumentException("Not an AMQP header buffer");
- }
- buffer = value.buffer();
- }
-
-
- @Override
- public String toString() {
- return buffer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
deleted file mode 100644
index f372d99..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.*;
-import org.fusesource.hawtdispatch.Task;
-
-import java.io.IOException;
-
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AmqpListener {
-
- public Sasl processSaslConnect(ProtonJTransport protonTransport) {
- return null;
- }
-
- public Sasl processSaslEvent(Sasl sasl) {
- return sasl;
- }
-
- public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
- ErrorCondition condition = endpoint.getCondition();
- condition.setCondition(Symbol.valueOf("error"));
- condition.setDescription("Not supported");
- endpoint.close();
- onComplete.run();
- }
-
- public void processRemoteClose(Endpoint endpoint, Task onComplete) {
- endpoint.close();
- onComplete.run();
- }
-
- public void processDelivery(Delivery delivery){
- }
-
- public void processTransportConnected() {
- }
-
- public void processTransportFailure(IOException e) {
- this.processFailure(e);
- }
-
- public void processFailure(Throwable e) {
- e.printStackTrace();
- }
-
- public void processRefill() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
deleted file mode 100644
index 13ed1e3..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
-
-import java.io.IOException;
-
-/**
- * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpProtocolCodec extends AbstractProtocolCodec {
-
- int maxFrameSize = 4*1024*1024;
-
- @Override
- protected void encode(Object object) throws IOException {
- nextWriteBuffer.write((Buffer) object);
- }
-
- @Override
- protected Action initialDecodeAction() {
- return new Action() {
- public Object apply() throws IOException {
- Buffer magic = readBytes(8);
- if (magic != null) {
- nextDecodeAction = readFrameSize;
- return new AmqpHeader(magic);
- } else {
- return null;
- }
- }
- };
- }
-
- private final Action readFrameSize = new Action() {
- public Object apply() throws IOException {
- Buffer sizeBytes = peekBytes(4);
- if (sizeBytes != null) {
- int size = sizeBytes.bigEndianEditor().readInt();
- if (size < 8) {
- throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
- }
- if( size > maxFrameSize ) {
- throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
- }
-
- // TODO: check frame min and max size..
- nextDecodeAction = readFrame(size);
- return nextDecodeAction.apply();
- } else {
- return null;
- }
- }
- };
-
-
- private final Action readFrame(final int size) {
- return new Action() {
- public Object apply() throws IOException {
- Buffer frameData = readBytes(size);
- if (frameData != null) {
- nextDecodeAction = readFrameSize;
- return frameData;
- } else {
- return null;
- }
- }
- };
- }
-
- public int getReadBytesPendingDecode() {
- return readBuffer.position() - readStart;
- }
-
- public void skipProtocolHeader() {
- nextDecodeAction = readFrameSize;
- }
-
- public void readProtocolHeader() {
- nextDecodeAction = initialDecodeAction();
- }
-
- public int getMaxFrameSize() {
- return maxFrameSize;
- }
-
- public void setMaxFrameSize(int maxFrameSize) {
- this.maxFrameSize = maxFrameSize;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org