You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/05/03 19:47:26 UTC
[2/3] qpid-proton git commit: PROTON-1188,
PROTON-1189: remove stale contrib/proton-jms and
contrib/proton-hawtdispatch modules
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
deleted file mode 100644
index bc3ec2e..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
-import org.apache.qpid.proton.hawtdispatch.api.Callback;
-import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
-import org.apache.qpid.proton.hawtdispatch.api.TransportState;
-import org.apache.qpid.proton.engine.*;
-import org.apache.qpid.proton.engine.impl.ByteBufferUtils;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.DataByteArrayOutputStream;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.hawtdispatch.*;
-import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
-import org.fusesource.hawtdispatch.transport.SslTransport;
-import org.fusesource.hawtdispatch.transport.TcpTransport;
-import org.fusesource.hawtdispatch.transport.Transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-
-import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
-import static org.fusesource.hawtdispatch.Dispatch.NOOP;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class AmqpTransport extends WatchBase {
-
- private TransportState state = CREATED;
-
- final DispatchQueue queue;
- final ProtonJConnection connection;
- Transport hawtdispatchTransport;
- ProtonJTransport protonTransport;
- Throwable failure;
- CustomDispatchSource<Defer,LinkedList<Defer>> defers;
-
- public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
-
- private AmqpTransport(DispatchQueue queue) {
- this.queue = queue;
- this.connection = (ProtonJConnection) Connection.Factory.create();
-
- defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
- defers.setEventHandler(new Task(){
- public void run() {
- for( Defer defer: defers.getData() ) {
- assert defer.defered = true;
- defer.defered = false;
- defer.run();
- }
- }
- });
- defers.resume();
- }
-
- static public AmqpTransport connect(AmqpConnectOptions options) {
- AmqpConnectOptions opts = options.clone();
- if( opts.getDispatchQueue() == null ) {
- opts.setDispatchQueue(Dispatch.createQueue());
- }
- if( opts.getBlockingExecutor() == null ) {
- opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
- }
- return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
- }
-
- private AmqpTransport connecting(final AmqpConnectOptions options) {
- assert state == CREATED;
- try {
- state = CONNECTING;
- if( options.getLocalContainerId()!=null ) {
- connection.setLocalContainerId(options.getLocalContainerId());
- }
- if( options.getRemoteContainerId()!=null ) {
- connection.setContainer(options.getRemoteContainerId());
- }
- connection.setHostname(options.getHost().getHost());
- Callback<Void> onConnect = new Callback<Void>() {
- @Override
- public void onSuccess(Void value) {
- if( state == CONNECTED ) {
- hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
- fireWatches();
- }
- }
-
- @Override
- public void onFailure(Throwable value) {
- if( state == CONNECTED || state == CONNECTING ) {
- failure = value;
- disconnect();
- fireWatches();
- }
- }
- };
- if( options.getUser()!=null ) {
- onConnect = new SaslClientHandler(options, onConnect);
- }
- createTransport(options, onConnect);
- } catch (Throwable e) {
- failure = e;
- }
- fireWatches();
- return this;
- }
-
- public TransportState getState() {
- return state;
- }
-
- /**
- * Creates and start a transport to the AMQP server. Passes it to the onConnect
- * once the transport is connected.
- *
- * @param onConnect
- * @throws Exception
- */
- void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
- final TcpTransport transport;
- if( options.getSslContext() !=null ) {
- SslTransport ssl = new SslTransport();
- ssl.setSSLContext(options.getSslContext());
- transport = ssl;
- } else {
- transport = new TcpTransport();
- }
-
- URI host = options.getHost();
- if( host.getPort() == -1 ) {
- if( options.getSslContext()!=null ) {
- host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
- } else {
- host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
- }
- }
-
-
- transport.setBlockingExecutor(options.getBlockingExecutor());
- transport.setDispatchQueue(options.getDispatchQueue());
-
- transport.setMaxReadRate(options.getMaxReadRate());
- transport.setMaxWriteRate(options.getMaxWriteRate());
- transport.setReceiveBufferSize(options.getReceiveBufferSize());
- transport.setSendBufferSize(options.getSendBufferSize());
- transport.setTrafficClass(options.getTrafficClass());
- transport.setUseLocalHost(options.isUseLocalHost());
-
- transport.setTransportListener(new DefaultTransportListener(){
- public void onTransportConnected() {
- if(state==CONNECTING) {
- state = CONNECTED;
- onConnect.onSuccess(null);
- transport.resumeRead();
- }
- }
-
- public void onTransportFailure(final IOException error) {
- if(state==CONNECTING) {
- onConnect.onFailure(error);
- }
- }
-
- });
- transport.connecting(host, options.getLocalAddress());
- bind(transport);
- transport.start(NOOP);
- }
-
- class SaslClientHandler extends ChainedCallback<Void, Void> {
-
- private final AmqpConnectOptions options;
-
- public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
- super(next);
- this.options = options;
- }
-
- public void onSuccess(final Void value) {
- final Sasl s = protonTransport.sasl();
- s.client();
- pumpOut();
- hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
-
- Sasl sasl = s;
-
- @Override
- void process() {
- if (sasl != null) {
- sasl = processSaslEvent(sasl);
- if (sasl == null) {
- // once sasl handshake is done.. we need to read the protocol header again.
- ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
- }
- }
- }
-
- @Override
- public void onTransportFailure(IOException error) {
- next.onFailure(error);
- }
-
- @Override
- void onFailure(Throwable error) {
- next.onFailure(error);
- }
-
- boolean authSent = false;
-
- private Sasl processSaslEvent(Sasl sasl) {
- if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
- next.onSuccess(null);
- return null;
- }
- HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
- if (!authSent && !mechanisims.isEmpty()) {
- if (mechanisims.contains("PLAIN")) {
- authSent = true;
- DataByteArrayOutputStream os = new DataByteArrayOutputStream();
- try {
- os.writeByte(0);
- os.write(new UTF8Buffer(options.getUser()));
- os.writeByte(0);
- if (options.getPassword() != null) {
- os.write(new UTF8Buffer(options.getPassword()));
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- Buffer buffer = os.toBuffer();
- sasl.setMechanisms(new String[]{"PLAIN"});
- sasl.send(buffer.data, buffer.offset, buffer.length);
- } else if (mechanisims.contains("ANONYMOUS")) {
- authSent = true;
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- sasl.send(new byte[0], 0, 0);
- } else {
- next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
- return null;
- }
- }
- return sasl;
- }
- });
- }
- }
-
- class SaslServerListener extends AmqpTransportListener {
- Sasl sasl;
-
- @Override
- public void onTransportCommand(Object command) {
- try {
- if (command.getClass() == AmqpHeader.class) {
- AmqpHeader header = (AmqpHeader)command;
- switch( header.getProtocolId() ) {
- case 3: // Client will be using SASL for auth..
- if( listener!=null ) {
- sasl = listener.processSaslConnect(protonTransport);
- break;
- }
- default:
- AmqpTransportListener listener = new AmqpTransportListener();
- hawtdispatchTransport.setTransportListener(listener);
- listener.onTransportCommand(command);
- return;
- }
- command = header.getBuffer();
- }
- } catch (Exception e) {
- onFailure(e);
- }
- super.onTransportCommand(command);
- }
-
- @Override
- void process() {
- if (sasl != null) {
- sasl = listener.processSaslEvent(sasl);
- }
- if (sasl == null) {
- // once sasl handshake is done.. we need to read the protocol header again.
- ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
- hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
- }
- }
- }
-
- static public AmqpTransport accept(Transport transport) {
- return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
- }
-
- private AmqpTransport accepted(final Transport transport) {
- state = CONNECTED;
- bind(transport);
- hawtdispatchTransport.setTransportListener(new SaslServerListener());
- return this;
- }
-
- private void bind(final Transport transport) {
- this.hawtdispatchTransport = transport;
- this.protonTransport = (ProtonJTransport) org.apache.qpid.proton.engine.Transport.Factory.create();
- this.protonTransport.bind(connection);
- if( transport.getProtocolCodec()==null ) {
- try {
- transport.setProtocolCodec(new AmqpProtocolCodec());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void defer(Defer defer) {
- if( !defer.defered ) {
- defer.defered = true;
- defers.merge(defer);
- }
- }
-
- public void pumpOut() {
- assertExecuting();
- defer(deferedPumpOut);
- }
-
- private Defer deferedPumpOut = new Defer() {
- public void run() {
- doPumpOut();
- }
- };
-
- private void doPumpOut() {
- switch(state) {
- case CONNECTING:
- case CONNECTED:
- break;
- default:
- return;
- }
-
- int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
- byte data[] = new byte[size];
- boolean done = false;
- int pumped = 0;
- while( !done && !hawtdispatchTransport.full() ) {
- int count = protonTransport.output(data, 0, size);
- if( count > 0 ) {
- pumped += count;
- boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
- assert accepted: "Should be accepted since the transport was not full";
- } else {
- done = true;
- }
- }
- if( pumped > 0 && !hawtdispatchTransport.full() ) {
- listener.processRefill();
- }
- }
-
- public Sasl sasl;
- public void fireListenerEvents() {
- fireWatches();
-
- if( sasl!=null ) {
- sasl = listener.processSaslEvent(sasl);
- if( sasl==null ) {
- // once sasl handshake is done.. we need to read the protocol header again.
- ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
- }
- }
-
- context(connection).fireListenerEvents(listener);
-
- Session session = connection.sessionHead(ALL_SET, ALL_SET);
- while(session != null)
- {
- context(session).fireListenerEvents(listener);
- session = session.next(ALL_SET, ALL_SET);
- }
-
- Link link = connection.linkHead(ALL_SET, ALL_SET);
- while(link != null)
- {
- context(link).fireListenerEvents(listener);
- link = link.next(ALL_SET, ALL_SET);
- }
-
- Delivery delivery = connection.getWorkHead();
- while(delivery != null)
- {
- listener.processDelivery(delivery);
- delivery = delivery.getWorkNext();
- }
-
- listener.processRefill();
- }
-
-
- public ProtonJConnection connection() {
- return connection;
- }
-
- AmqpListener listener = new AmqpListener();
- public AmqpListener getListener() {
- return listener;
- }
-
- public void setListener(AmqpListener listener) {
- this.listener = listener;
- }
-
- public EndpointContext context(Endpoint endpoint) {
- EndpointContext context = (EndpointContext) endpoint.getContext();
- if( context == null ) {
- context = new EndpointContext(this, endpoint);
- endpoint.setContext(context);
- }
- return context;
- }
-
- class AmqpTransportListener extends DefaultTransportListener {
-
- @Override
- public void onTransportConnected() {
- if( listener!=null ) {
- listener.processTransportConnected();
- }
- }
-
- @Override
- public void onRefill() {
- if( listener!=null ) {
- listener.processRefill();
- }
- }
-
- @Override
- public void onTransportCommand(Object command) {
- if( state != CONNECTED ) {
- return;
- }
- try {
- Buffer buffer;
- if (command.getClass() == AmqpHeader.class) {
- buffer = ((AmqpHeader) command).getBuffer();
- } else {
- buffer = (Buffer) command;
- }
- ByteBuffer bbuffer = buffer.toByteBuffer();
- do {
- ByteBuffer input = protonTransport.getInputBuffer();
- ByteBufferUtils.pour(bbuffer, input);
- protonTransport.processInput();
- } while (bbuffer.remaining() > 0);
- process();
- pumpOut();
- } catch (Exception e) {
- onFailure(e);
- }
- }
-
- void process() {
- fireListenerEvents();
- }
-
- @Override
- public void onTransportFailure(IOException error) {
- if( state==CONNECTED ) {
- failure = error;
- if( listener!=null ) {
- listener.processTransportFailure(error);
- fireWatches();
- }
- }
- }
-
- void onFailure(Throwable error) {
- failure = error;
- if( listener!=null ) {
- listener.processFailure(error);
- fireWatches();
- }
- }
- }
-
- public void disconnect() {
- assertExecuting();
- if( state == CONNECTING || state==CONNECTED) {
- state = DISCONNECTING;
- if( hawtdispatchTransport!=null ) {
- hawtdispatchTransport.stop(new Task(){
- public void run() {
- state = DISCONNECTED;
- hawtdispatchTransport = null;
- protonTransport = null;
- fireWatches();
- }
- });
- }
- }
- }
-
- public DispatchQueue queue() {
- return queue;
- }
-
- public void assertExecuting() {
- queue().assertExecuting();
- }
-
- public void onTransportConnected(final Callback<Void> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if( failure !=null ) {
- cb.onFailure(failure);
- return true;
- }
- if( state!=CONNECTING ) {
- cb.onSuccess(null);
- return true;
- }
- return false;
- }
- });
- }
-
- public void onTransportDisconnected(final Callback<Void> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if( state==DISCONNECTED ) {
- cb.onSuccess(null);
- return true;
- }
- return false;
- }
- });
- }
-
- public void onTransportFailure(final Callback<Throwable> cb) {
- addWatch(new Watch() {
- @Override
- public boolean execute() {
- if( failure!=null ) {
- cb.onSuccess(failure);
- return true;
- }
- return false;
- }
- });
- }
-
- public Throwable getFailure() {
- return failure;
- }
-
- public void setProtocolTracer(ProtocolTracer protocolTracer) {
- protonTransport.setProtocolTracer(protocolTracer);
- }
-
- public ProtocolTracer getProtocolTracer() {
- return protonTransport.getProtocolTracer();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
deleted file mode 100644
index eee8241..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtdispatch.Task;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class Defer extends Task {
- boolean defered;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
deleted file mode 100644
index c12a849..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.fusesource.hawtdispatch.Task;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class EndpointContext {
-
- private final AmqpTransport transport;
- private final Endpoint endpoint;
- private Object attachment;
- boolean listenerProcessing;
-
- public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
- this.transport = transport;
- this.endpoint = endpoint;
- }
-
- class ProcessedTask extends Task {
- @Override
- public void run() {
- transport.assertExecuting();
- listenerProcessing = false;
- transport.pumpOut();
- }
- }
-
- public void fireListenerEvents(AmqpListener listener) {
- if( listener!=null && !listenerProcessing ) {
- if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
- endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
- listenerProcessing = true;
- listener.processRemoteOpen(endpoint, new ProcessedTask());
- } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
- endpoint.getRemoteState() == EndpointState.CLOSED ) {
- listenerProcessing = true;
- listener.processRemoteClose(endpoint, new ProcessedTask());
- }
- }
- if( attachment !=null && attachment instanceof Task ) {
- ((Task) attachment).run();
- }
- }
-
- public Object getAttachment() {
- return attachment;
- }
-
- public <T> T getAttachment(Class<T> clazz) {
- return clazz.cast(getAttachment());
- }
-
- public void setAttachment(Object attachment) {
- this.attachment = attachment;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
deleted file mode 100644
index 8d6f83b..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class Support {
-
- public static IllegalStateException illegalState(String msg) {
- return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
- }
-
- public static IllegalStateException createUnhandledEventError() {
- return illegalState("Unhandled event.");
- }
-
- public static IllegalStateException createListenerNotSetError() {
- return illegalState("No connection listener set to handle message received from the server.");
- }
-
- public static IllegalStateException createDisconnectedError() {
- return illegalState("Disconnected");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
deleted file mode 100644
index 6bb7603..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public abstract class Watch {
- /* returns true if the watch has been triggered */
- public abstract boolean execute();
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
deleted file mode 100644
index a4b1591..0000000
--- a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.hawtdispatch.impl;
-
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.Task;
-
-import java.util.LinkedList;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-abstract public class WatchBase {
-
- private LinkedList<Watch> watches = new LinkedList<Watch>();
- protected void addWatch(final Watch task) {
- watches.add(task);
- fireWatches();
- }
-
- protected void fireWatches() {
- if( !this.watches.isEmpty() ) {
- Dispatch.getCurrentQueue().execute(new Task(){
- @Override
- public void run() {
- // Lets see if any of the watches are triggered.
- LinkedList<Watch> tmp = watches;
- watches = new LinkedList<Watch>();
- for (Watch task : tmp) {
- if( !task.execute() ) {
- watches.add(task);
- }
- }
- }
- });
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
deleted file mode 100644
index d4bc733..0000000
--- a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.api;
-
-import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.net.URISyntaxException;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.hawtdispatch.test.MessengerServer;
-import org.apache.qpid.proton.message.Message;
-import org.fusesource.hawtdispatch.Task;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-/**
- * Hello world!
- *
- */
-
-public class SampleTest {
-
- private static final Logger _logger = Logger.getLogger(SampleTest.class.getName());
-
- private MessengerServer server;
-
- @Before
- public void startServer() {
- server = new MessengerServer();
- server.start();
- }
-
- @After
- public void stopServer() {
- server.stop();
- }
-
- @Test
- public void test() throws Exception {
- int expected = 10;
- final AtomicInteger countdown = new AtomicInteger(expected);
- AmqpConnectOptions options = new AmqpConnectOptions();
- final String container = UUID.randomUUID().toString();
- try {
- options.setHost(server.getHost(), server.getPort());
- options.setLocalContainerId(container);
- options.setUser("anonymous");
- options.setPassword("changeit");
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- final AmqpConnection conn = AmqpConnection.connect(options );
- _logger.fine("connection queue");
- conn.queue().execute(new Task() {
-
- @Override
- public void run() {
- _logger.fine("connection running, setup callbacks");
- conn.onTransportFailure(new Callback<Throwable>() {
-
- @Override
- public void onSuccess(Throwable value) {
- _logger.fine("transportFailure Success? " + str(value));
- conn.close();
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("transportFailure Trouble! " + str(value));
- conn.close();
- }
- });
-
- conn.onConnected(new Callback<Void>() {
-
- @Override
- public void onSuccess(Void value) {
- _logger.fine("on connect Success! in container " + container);
- final AmqpSession session = conn.createSession();
- Target rqtarget = new Target();
- rqtarget.setAddress("rq-tgt");
- final AmqpSender sender = session.createSender(rqtarget, QoS.AT_LEAST_ONCE, "request-yyy");
- Source rqsource = new Source();
- rqsource.setAddress("rs-src");
- sender.getEndpoint().setSource(rqsource);
- Source rssource = new Source();
- rssource.setAddress("rs-src");
- final AmqpReceiver receiver = session.createReceiver(rssource , QoS.AT_LEAST_ONCE, 10, "response-yyy");
- Target rstarget = new Target();
- final String address = "rs-tgt";
- rstarget.setAddress(address);
- receiver.getEndpoint().setTarget(rstarget);
- sender.onRemoteClose(new Callback<ErrorCondition>() {
-
- @Override
- public void onSuccess(ErrorCondition value) {
- _logger.fine("sender remote close!" + str(value));
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("sender remote close Trouble!" + str(value));
- conn.close();
-
- }
-
- });
- receiver.onRemoteClose(new Callback<ErrorCondition>() {
-
- @Override
- public void onSuccess(ErrorCondition value) {
- _logger.fine("receiver remote close!" + str(value));
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("receiver remote close Trouble!" + str(value));
- conn.close();
-
- }
-
- });
-
- final Task work = new Task() {
-
- private AtomicInteger count = new AtomicInteger();
-
- @Override
- public void run() {
- Message message = session.createTextMessage("hello world! " + String.valueOf(count.incrementAndGet()));
- message.setAddress("amqp://joze/rq-src");
- String reply_to = "amqp://" + container + "/" + address;
- message.setReplyTo(reply_to);
- message.setCorrelationId("correlator");
- final MessageDelivery md = sender.send(message);
- md.onRemoteStateChange(new Callback<DeliveryState>() {
-
- @Override
- public void onSuccess(DeliveryState value) {
- _logger.fine("delivery remote state change! " + str(value) +
- " local: "+ str(md.getLocalState()) +
- " remote: " + str(md.getRemoteState()));
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("remote state change Trouble!" + str(value));
- conn.close();
- }
-
- });
- md.onSettle(new Callback<DeliveryState>() {
-
- @Override
- public void onSuccess(DeliveryState value) {
- _logger.fine("delivery settled! " + str(value) +
- " local: "+ str(md.getLocalState()) +
- " remote: " + str(md.getRemoteState()));
- _logger.fine("sender settle mode state " +
- " local receiver " + str(sender.getEndpoint().getReceiverSettleMode()) +
- " local sender " + str(sender.getEndpoint().getSenderSettleMode()) +
- " remote receiver " + str(sender.getEndpoint().getRemoteReceiverSettleMode()) +
- " remote sender " + str(sender.getEndpoint().getRemoteSenderSettleMode()) +
- ""
- );
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("delivery sending Trouble!" + str(value));
- conn.close();
- }
- });
- }
-
- };
- receiver.setDeliveryListener(new AmqpDeliveryListener() {
-
- @Override
- public void onMessageDelivery(
- MessageDelivery delivery) {
- Message message = delivery.getMessage();
- _logger.fine("incoming message delivery! " +
- " local " + str(delivery.getLocalState()) +
- " remote " + str(delivery.getRemoteState()) +
- " message " + str(message.getBody()) +
- "");
- delivery.onSettle(new Callback<DeliveryState>() {
-
- @Override
- public void onSuccess(DeliveryState value) {
- _logger.fine("incoming message settled! ");
- int i = countdown.decrementAndGet();
- if ( i > 0 ) {
- _logger.fine("More work " + str(i));
- work.run();
- } else {
- conn.queue().executeAfter(100, TimeUnit.MILLISECONDS, new Task() {
-
- @Override
- public void run() {
- _logger.fine("stopping sender");
- sender.close();
- }
- });
- conn.queue().executeAfter(200, TimeUnit.MILLISECONDS, new Task() {
-
- @Override
- public void run() {
- _logger.fine("stopping receiver");
- receiver.close();
-
- }
- });
- conn.queue().executeAfter(300, TimeUnit.MILLISECONDS, new Task() {
-
- @Override
- public void run() {
- _logger.fine("stopping session");
- session.close();
-
- }
- });
- conn.queue().executeAfter(400, TimeUnit.MILLISECONDS, new Task() {
-
- @Override
- public void run() {
- _logger.fine("stopping connection");
- conn.close();
-
- }
- });
- }
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("trouble settling incoming message " + str(value));
- conn.close();
- }
- });
- delivery.settle();
- }
-
- });
-
- // start the receiver
- receiver.resume();
-
- // send first message
- conn.queue().execute(work);
- }
-
- @Override
- public void onFailure(Throwable value) {
- _logger.fine("on connect Failure?" + str(value));
- conn.close();
- }
- });
- _logger.fine("connection setup done");
-
-
- }
-
- });
- try {
- _logger.fine("Waiting...");
- Future<Void> disconnectedFuture = conn.getDisconnectedFuture();
- disconnectedFuture.await(10, TimeUnit.SECONDS);
- _logger.fine("done");
- assertEquals(expected, server.getMessagesReceived());
- } catch (Exception e) {
- _logger.log(Level.SEVERE, "Test failed, possibly due to timeout", e);
- throw e;
- }
- }
-
- private String str(Object value) {
- if (value == null)
- return "null";
- return value.toString();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
deleted file mode 100644
index 96c772b..0000000
--- a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.hawtdispatch.test;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.qpid.proton.InterruptException;
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.Tracker;
-
-public class MessengerServer {
- public static final String REJECT_ME = "*REJECT-ME*";
- private int timeout = 1000;
- private String host = "127.0.0.1";
- private int port = 55555;
- private Messenger msgr;
- private AtomicInteger messagesReceived = new AtomicInteger(0);
- private AtomicInteger messagesSent = new AtomicInteger(0);
- private AtomicBoolean serverShouldRun = new AtomicBoolean();
- private AtomicReference<Throwable> issues = new AtomicReference<Throwable>();
- private Thread thread;
- private CountDownLatch serverStart;
-
- public MessengerServer() {
- }
- public void start() {
- if (!serverShouldRun.compareAndSet(false, true)) {
- throw new IllegalStateException("started twice");
- }
- msgr = Proton.messenger();
- serverStart = new CountDownLatch(1);
- thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- msgr.start();
- msgr.subscribe("amqp://~"+host+":"+String.valueOf(port));
- serverStart.countDown();
- try {
- while(serverShouldRun.get()) {
- msgr.recv(100);
- while (msgr.incoming() > 0) {
- Message msg = msgr.get();
- messagesReceived.incrementAndGet();
- Tracker tracker = msgr.incomingTracker();
- if (REJECT_ME.equals(msg.getBody())) {
- msgr.reject(tracker , 0);
- } else {
- msgr.accept(tracker, 0);
- }
- String reply_to = msg.getReplyTo();
- if (reply_to != null) {
- msg.setAddress(reply_to);
- msgr.put(msg);
- msgr.settle(msgr.outgoingTracker(), 0);
- }
- }
- }
- } finally {
- msgr.stop();
- }
- } catch (InterruptException ex) {
- // we're done
- } catch (Exception ex) {
- issues.set(ex);
- }
- }
-
- });
- thread.setName("MessengerServer");
- thread.setDaemon(true);
- thread.start();
- try {
- serverStart.await();
- } catch (InterruptedException e) {
- msgr.interrupt();
- }
- }
-
- public void stop() {
- if (!serverShouldRun.compareAndSet(true, false)) {
- return;
- }
- if (serverStart.getCount() == 0)
- msgr.interrupt();
- try {
- thread.join(timeout);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- thread = null;
- if (!msgr.stopped())
- msgr.stop();
- Throwable throwable = issues.get();
- if (throwable != null)
- throw new RuntimeException("Messenger server had problems", throwable);
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
- public int getTimeout() {
- return timeout;
- }
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public int getMessagesReceived() {
- return messagesReceived.get();
- }
-
- public int getMessagesSent() {
- return messagesSent.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/pom.xml b/contrib/proton-jms/pom.xml
deleted file mode 100644
index f1a51c6..0000000
--- a/contrib/proton-jms/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-project</artifactId>
- <version>0.13.0-SNAPSHOT</version>
- <relativePath>../..</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>proton-jms</artifactId>
- <name>proton-jms</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-j</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- <version>1.1.1</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- </build>
- <scm>
- <url>http://svn.apache.org/viewvc/qpid/proton/</url>
- </scm>
-
-</project>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
deleted file mode 100644
index 4beb401..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.Message;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
-
-
- public AMQPNativeInboundTransformer(JMSVendor vendor) {
- super(vendor);
- }
-
- @Override
- public Message transform(EncodedMessage amqpMessage) throws Exception {
- org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
-
- Message rc = super.transform(amqpMessage);
-
- populateMessage(rc, amqp);
- return rc;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
deleted file mode 100644
index 7dc71d7..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.Header;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.message.ProtonJMessage;
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AMQPNativeOutboundTransformer extends OutboundTransformer {
-
- public AMQPNativeOutboundTransformer(JMSVendor vendor) {
- super(vendor);
- }
-
- @Override
- public EncodedMessage transform(Message msg) throws Exception {
- if( msg == null )
- return null;
- if( !(msg instanceof BytesMessage) )
- return null;
- try {
- if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
- return null;
- }
- } catch (MessageFormatException e) {
- return null;
- }
- return transform(this, (BytesMessage) msg);
- }
-
- static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
- long messageFormat;
- try {
- messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
- } catch (MessageFormatException e) {
- return null;
- }
- byte data[] = new byte[(int) msg.getBodyLength()];
- int dataSize = data.length;
- msg.readBytes(data);
- msg.reset();
-
- try {
- int count = msg.getIntProperty("JMSXDeliveryCount");
- if( count > 1 ) {
-
- // decode...
- ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
- int offset = 0;
- int len = data.length;
- while( len > 0 ) {
- final int decoded = amqp.decode(data, offset, len);
- assert decoded > 0: "Make progress decoding the message";
- offset += decoded;
- len -= decoded;
- }
-
- // Update the DeliveryCount header...
- // The AMQP delivery-count field only includes prior failed delivery attempts,
- // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
- if (amqp.getHeader() == null) {
- amqp.setHeader(new Header());
- }
-
- amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
-
- // Re-encode...
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
- final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
- int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
- if( overflow.position() > 0 ) {
- buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
- c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
- }
- data = buffer.array();
- dataSize = c;
- }
- } catch (JMSException e) {
- }
-
- return new EncodedMessage(messageFormat, data, 0, dataSize);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
deleted file mode 100644
index 9baabdf..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-
-public class AMQPRawInboundTransformer extends InboundTransformer {
-
- public AMQPRawInboundTransformer(JMSVendor vendor) {
- super(vendor);
- }
-
- @Override
- public Message transform(EncodedMessage amqpMessage) throws Exception {
- BytesMessage rc = vendor.createBytesMessage();
- rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
-
- rc.setJMSDeliveryMode(defaultDeliveryMode);
- rc.setJMSPriority(defaultPriority);
-
- final long now = System.currentTimeMillis();
- rc.setJMSTimestamp(now);
- if( defaultTtl > 0 ) {
- rc.setJMSExpiration(now + defaultTtl);
- }
-
- rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
- rc.setBooleanProperty(prefixVendor + "NATIVE", true);
-
- return rc;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
deleted file mode 100644
index f62760b..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
-
- private final JMSMappingOutboundTransformer transformer;
-
- public AutoOutboundTransformer(JMSVendor vendor) {
- super(vendor);
- transformer = new JMSMappingOutboundTransformer(vendor);
- }
-
- @Override
- public EncodedMessage transform(Message msg) throws Exception {
- if( msg == null )
- return null;
- if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
- if( msg instanceof BytesMessage ) {
- return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
- } else {
- return null;
- }
- } else {
- return transformer.transform(msg);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
deleted file mode 100644
index 19602c9..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.Binary;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class EncodedMessage
-{
-
- private final Binary data;
- final long messageFormat;
-
- public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
- this.data = new Binary(data, offset, length);
- this.messageFormat = messageFormat;
- }
-
- public long getMessageFormat() {
- return messageFormat;
- }
-
- public Message decode() throws Exception {
- Message amqp = Message.Factory.create();
-
- int offset = getArrayOffset();
- int len = getLength();
- while( len > 0 ) {
- final int decoded = amqp.decode(getArray(), offset, len);
- assert decoded > 0: "Make progress decoding the message";
- offset += decoded;
- len -= decoded;
- }
-
- return amqp;
- }
-
- public int getLength()
- {
- return data.getLength();
- }
-
- public int getArrayOffset()
- {
- return data.getArrayOffset();
- }
-
- public byte[] getArray()
- {
- return data.getArray();
- }
-
- @Override
- public String toString()
- {
- return data.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
deleted file mode 100644
index 0374e6a..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.amqp.*;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-
-import javax.jms.*;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public abstract class InboundTransformer {
-
- JMSVendor vendor;
-
- public static final String TRANSFORMER_NATIVE = "native";
- public static final String TRANSFORMER_RAW = "raw";
- public static final String TRANSFORMER_JMS = "jms";
-
- String prefixVendor = "JMS_AMQP_";
- String prefixDeliveryAnnotations = "DA_";
- String prefixMessageAnnotations= "MA_";
- String prefixFooter = "FT_";
-
- int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
- int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
- long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
-
- public InboundTransformer(JMSVendor vendor) {
- this.vendor = vendor;
- }
-
- abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
-
- public int getDefaultDeliveryMode() {
- return defaultDeliveryMode;
- }
-
- public void setDefaultDeliveryMode(int defaultDeliveryMode) {
- this.defaultDeliveryMode = defaultDeliveryMode;
- }
-
- public int getDefaultPriority() {
- return defaultPriority;
- }
-
- public void setDefaultPriority(int defaultPriority) {
- this.defaultPriority = defaultPriority;
- }
-
- public long getDefaultTtl() {
- return defaultTtl;
- }
-
- public void setDefaultTtl(long defaultTtl) {
- this.defaultTtl = defaultTtl;
- }
-
- public String getPrefixVendor() {
- return prefixVendor;
- }
-
- public void setPrefixVendor(String prefixVendor) {
- this.prefixVendor = prefixVendor;
- }
-
- public JMSVendor getVendor() {
- return vendor;
- }
-
- public void setVendor(JMSVendor vendor) {
- this.vendor = vendor;
- }
-
- protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
- Header header = amqp.getHeader();
- if( header==null ) {
- header = new Header();
- }
-
- if( header.getDurable()!=null ) {
- jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- } else {
- jms.setJMSDeliveryMode(defaultDeliveryMode);
- }
- if( header.getPriority()!=null ) {
- jms.setJMSPriority(header.getPriority().intValue());
- } else {
- jms.setJMSPriority(defaultPriority);
- }
- if( header.getFirstAcquirer() !=null ) {
- jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
- }
- if( header.getDeliveryCount()!=null ) {
- vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
- }
-
- final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
- if( da!=null ) {
- for (Map.Entry<?,?> entry : da.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
- }
- }
-
- Class<? extends Destination> toAttributes = Destination.class;
- Class<? extends Destination> replyToAttributes = Destination.class;
-
- final MessageAnnotations ma = amqp.getMessageAnnotations();
- if( ma!=null ) {
- for (Map.Entry<?,?> entry : ma.getValue().entrySet()) {
- String key = entry.getKey().toString();
- if( "x-opt-jms-type".equals(key.toString()) && entry.getValue() != null ) {
- jms.setJMSType(entry.getValue().toString());
- } else if( "x-opt-to-type".equals(key.toString()) ) {
- toAttributes = toClassFromAttributes(entry.getValue().toString());
- } else if( "x-opt-reply-type".equals(key.toString()) ) {
- replyToAttributes = toClassFromAttributes(entry.getValue().toString());
- } else {
- setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
- }
- }
- }
-
- final ApplicationProperties ap = amqp.getApplicationProperties();
- if( ap !=null ) {
- for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
- String key = entry.getKey().toString();
- if( "JMSXGroupID".equals(key) ) {
- vendor.setJMSXGroupID(jms, entry.getValue().toString());
- } else if( "JMSXGroupSequence".equals(key) ) {
- vendor.setJMSXGroupSequence(jms, ((Number)entry.getValue()).intValue());
- } else if( "JMSXUserID".equals(key) ) {
- vendor.setJMSXUserID(jms, entry.getValue().toString());
- } else {
- setProperty(jms, key, entry.getValue());
- }
- }
- }
-
- final Properties properties = amqp.getProperties();
- if( properties!=null ) {
- if( properties.getMessageId()!=null ) {
- jms.setJMSMessageID(properties.getMessageId().toString());
- }
- Binary userId = properties.getUserId();
- if( userId!=null ) {
- vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
- }
- if( properties.getTo()!=null ) {
- jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes));
- }
- if( properties.getSubject()!=null ) {
- jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
- }
- if( properties.getReplyTo() !=null ) {
- jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes));
- }
- if( properties.getCorrelationId() !=null ) {
- jms.setJMSCorrelationID(properties.getCorrelationId().toString());
- }
- if( properties.getContentType() !=null ) {
- jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
- }
- if( properties.getContentEncoding() !=null ) {
- jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
- }
- if( properties.getCreationTime()!=null ) {
- jms.setJMSTimestamp(properties.getCreationTime().getTime());
- }
- if( properties.getGroupId()!=null ) {
- vendor.setJMSXGroupID(jms, properties.getGroupId());
- }
- if( properties.getGroupSequence()!=null ) {
- vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
- }
- if( properties.getReplyToGroupId()!=null ) {
- jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
- }
- if( properties.getAbsoluteExpiryTime()!=null ) {
- jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
- }
- }
-
- // If the jms expiration has not yet been set...
- if( jms.getJMSExpiration()==0 ) {
- // Then lets try to set it based on the message ttl.
- long ttl = defaultTtl;
- if( header.getTtl()!=null ) {
- ttl = header.getTtl().longValue();
- }
- if( ttl == 0 ) {
- jms.setJMSExpiration(0);
- } else {
- jms.setJMSExpiration(System.currentTimeMillis()+ttl);
- }
- }
-
- final Footer fp = amqp.getFooter();
- if( fp !=null ) {
- for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
- }
- }
- }
-
- private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
- private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
- private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary");
- private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary");
-
- private static Set<String> createSet(String ... args) {
- HashSet<String> s = new HashSet<String>();
- for (String arg : args)
- {
- s.add(arg);
- }
- return Collections.unmodifiableSet(s);
- }
-
- Class<? extends Destination> toClassFromAttributes(String value)
- {
- if( value ==null ) {
- return null;
- }
- HashSet<String> attributes = new HashSet<String>();
- for( String x: value.split("\\s*,\\s*") ) {
- attributes.add(x);
- }
-
- if( QUEUE_ATTRIBUTES.equals(attributes) ) {
- return Queue.class;
- }
- if( TOPIC_ATTRIBUTES.equals(attributes) ) {
- return Topic.class;
- }
- if( TEMP_QUEUE_ATTRIBUTES.equals(attributes) ) {
- return TemporaryQueue.class;
- }
- if( TEMP_TOPIC_ATTRIBUTES.equals(attributes) ) {
- return TemporaryTopic.class;
- }
- return Destination.class;
- }
-
-
- private void setProperty(Message msg, String key, Object value) throws JMSException {
- if( value instanceof UnsignedLong) {
- long v = ((UnsignedLong) value).longValue();
- msg.setLongProperty(key, v);
- } else if( value instanceof UnsignedInteger) {
- long v = ((UnsignedInteger) value).longValue();
- if( Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE ) {
- msg.setIntProperty(key, (int) v);
- } else {
- msg.setLongProperty(key, v);
- }
- } else if( value instanceof UnsignedShort) {
- int v = ((UnsignedShort) value).intValue();
- if( Short.MIN_VALUE <= v && v <= Short.MAX_VALUE ) {
- msg.setShortProperty(key, (short) v);
- } else {
- msg.setIntProperty(key, v);
- }
- } else if( value instanceof UnsignedByte) {
- short v = ((UnsignedByte) value).shortValue();
- if( Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE ) {
- msg.setByteProperty(key, (byte) v);
- } else {
- msg.setShortProperty(key, v);
- }
- } else if( value instanceof Symbol) {
- msg.setStringProperty(key, value.toString());
- } else if( value instanceof Decimal128 ) {
- msg.setDoubleProperty(key, ((Decimal128)value).doubleValue());
- } else if( value instanceof Decimal64 ) {
- msg.setDoubleProperty(key, ((Decimal64)value).doubleValue());
- } else if( value instanceof Decimal32 ) {
- msg.setFloatProperty(key, ((Decimal32)value).floatValue());
- } else if( value instanceof Binary ) {
- msg.setStringProperty(key, value.toString());
- } else {
- msg.setObjectProperty(key, value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564a4d02/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java
deleted file mode 100644
index 82c03c6..0000000
--- a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingInboundTransformer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.amqp.messaging.*;
-
-import javax.jms.*;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
-* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-*/
-public class JMSMappingInboundTransformer extends InboundTransformer {
-
- public JMSMappingInboundTransformer(JMSVendor vendor) {
- super(vendor);
- }
-
- @Override
- public Message transform(EncodedMessage amqpMessage) throws Exception {
- org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
-
- Message rc;
- final Section body = amqp.getBody();
- if( body == null ) {
- rc = vendor.createMessage();
- } else if( body instanceof Data ) {
- Binary d = ((Data) body).getValue();
- BytesMessage m = vendor.createBytesMessage();
- m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
- rc = m;
- } else if (body instanceof AmqpSequence ) {
- AmqpSequence sequence = (AmqpSequence) body;
- StreamMessage m = vendor.createStreamMessage();
- for( Object item : sequence.getValue()) {
- m.writeObject(item);
- }
- rc = m;
- } else if (body instanceof AmqpValue) {
- Object value = ((AmqpValue) body).getValue();
- if( value == null ) {
- rc = vendor.createObjectMessage();
- } if( value instanceof String ) {
- TextMessage m = vendor.createTextMessage();
- m.setText((String) value);
- rc = m;
- } else if( value instanceof Binary ) {
- Binary d = (Binary) value;
- BytesMessage m = vendor.createBytesMessage();
- m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
- rc = m;
- } else if( value instanceof List) {
- StreamMessage m = vendor.createStreamMessage();
- for( Object item : (List) value) {
- m.writeObject(item);
- }
- rc = m;
- } else if( value instanceof Map) {
- MapMessage m = vendor.createMapMessage();
- final Set<Map.Entry<String, Object>> set = ((Map) value).entrySet();
- for (Map.Entry<String, Object> entry : set) {
- m.setObject(entry.getKey(), entry.getValue());
- }
- rc = m;
- } else {
- ObjectMessage m = vendor.createObjectMessage();
- m.setObject((Serializable) value);
- rc = m;
- }
- } else {
- throw new RuntimeException("Unexpected body type: "+body.getClass());
- }
- rc.setJMSDeliveryMode(defaultDeliveryMode);
- rc.setJMSPriority(defaultPriority);
- rc.setJMSExpiration(defaultTtl);
-
- populateMessage(rc, amqp);
-
- rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
- rc.setBooleanProperty(prefixVendor + "NATIVE", false);
- return rc;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org