You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/22 17:56:22 UTC
[14/16] qpid-jms git commit: remove tmp module with old client work
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/engine/temp/Events.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/engine/temp/Events.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/engine/temp/Events.java
deleted file mode 100644
index 716f3d6..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/engine/temp/Events.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.engine.temp;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.engine.Event;
-
-/**
- * Events
- *
- * TODO: find a new home for this.
- */
-
-public final class Events
-{
- private static Logger LOGGER = Logger.getLogger(Events.class.getName());
-
- private Events() {}
-
- public static void dispatch(Event event, EventHandler handler) {
- //TODO: delete
- if(LOGGER.isLoggable(Level.FINEST))
- {
- String name = Thread.currentThread().getName();
- LOGGER.finest("[" + name + "] " + "Dispatching event:" + event);
- }
-
- switch (event.getType()) {
- case CONNECTION_INIT:
- handler.onInit(event.getConnection());
- break;
- case CONNECTION_OPEN:
- handler.onOpen(event.getConnection());
- break;
- case CONNECTION_REMOTE_OPEN:
- handler.onRemoteOpen(event.getConnection());
- break;
- case CONNECTION_CLOSE:
- handler.onClose(event.getConnection());
- break;
- case CONNECTION_REMOTE_CLOSE:
- handler.onRemoteClose(event.getConnection());
- break;
- case CONNECTION_FINAL:
- handler.onFinal(event.getConnection());
- break;
- case SESSION_INIT:
- handler.onInit(event.getSession());
- break;
- case SESSION_OPEN:
- handler.onOpen(event.getSession());
- break;
- case SESSION_REMOTE_OPEN:
- handler.onRemoteOpen(event.getSession());
- break;
- case SESSION_CLOSE:
- handler.onClose(event.getSession());
- break;
- case SESSION_REMOTE_CLOSE:
- handler.onRemoteClose(event.getSession());
- break;
- case SESSION_FINAL:
- handler.onFinal(event.getSession());
- break;
- case LINK_INIT:
- handler.onInit(event.getLink());
- break;
- case LINK_OPEN:
- handler.onOpen(event.getLink());
- break;
- case LINK_REMOTE_OPEN:
- handler.onRemoteOpen(event.getLink());
- break;
- case LINK_CLOSE:
- handler.onClose(event.getLink());
- break;
- case LINK_REMOTE_CLOSE:
- handler.onRemoteClose(event.getLink());
- break;
- case LINK_FLOW:
- handler.onFlow(event.getLink());
- break;
- case LINK_FINAL:
- handler.onFinal(event.getLink());
- break;
- case TRANSPORT:
- handler.onTransport(event.getTransport());
- break;
- case DELIVERY:
- handler.onDelivery(event.getDelivery());
- break;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
deleted file mode 100644
index 5d668aa..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.jms.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-
-import org.apache.qpid.jms.engine.AmqpBytesMessage;
-
-public class BytesMessageImpl extends MessageImpl<AmqpBytesMessage> implements BytesMessage
-{
- private OutputStreamHelper _outputStreamHelper = new OutputStreamHelper();
- private InputStreamHelper _inputStreamHelper = new InputStreamHelper();
-
- //message to be sent
- public BytesMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
- {
- super(new AmqpBytesMessage(), sessionImpl, connectionImpl);
- _outputStreamHelper.createOutputStreams();
- }
-
- //message just received
- public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
- {
- super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- createDataInputStreamFromUnderlyingMessage();
- }
-
- private void createDataInputStreamFromUnderlyingMessage()
- {
- AmqpBytesMessage amqpBytesMessage = getUnderlyingAmqpMessage(false);
- ByteArrayInputStream byteArrayInputStream = amqpBytesMessage.getByteArrayInputStream();
-
- _inputStreamHelper.createNewInputStream(byteArrayInputStream);
- }
-
- @Override
- protected AmqpBytesMessage prepareUnderlyingAmqpMessageForSending(AmqpBytesMessage amqpMessage)
- {
- if(_outputStreamHelper.hasOutputStreams())
- {
- byte[] data = _outputStreamHelper.getByteOutput();
- amqpMessage.setBytes(data);
- }
- else
- {
- //We are sending what we received or set on it earlier.
- //Ensure the body is a Data section.
- amqpMessage.convertBodyToDataSectionIfNecessary();
- }
-
- return amqpMessage;
- }
-
- private JMSException createInputException(final IOException e)
- {
- if(e instanceof EOFException)
- {
- return new QpidJmsMessageEofException(e.getMessage(), e);
- }
- else
- {
- return new QpidJmsMessageFormatException(e.getMessage(), e);
- }
- }
-
- private JMSException createOutputException(final IOException e)
- {
- return new QpidJmsException(e.getMessage(), e);
- }
-
- void checkBodyReadable() throws MessageNotReadableException
- {
- if(isBodyWritable())
- {
- throw new MessageNotReadableException("Message body is currently in write-only mode");
- }
- }
-
- //======= JMS Methods =======
-
- @Override
- public long getBodyLength() throws JMSException
- {
- checkBodyReadable();
-
- return getUnderlyingAmqpMessage(false).getBytesLength();
- }
-
- @Override
- public boolean readBoolean() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readBoolean();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public byte readByte() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readByte();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public int readUnsignedByte() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readUnsignedByte();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public short readShort() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readShort();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public int readUnsignedShort() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readUnsignedShort();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public char readChar() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readChar();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public int readInt() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readInt();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public long readLong() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readLong();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public float readFloat() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readFloat();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public double readDouble() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readDouble();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public String readUTF() throws JMSException
- {
- checkBodyReadable();
-
- try
- {
- return _inputStreamHelper.readUTF();
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public int readBytes(byte[] value) throws JMSException
- {
- return readBytes(value, value.length);
- }
-
- @Override
- public int readBytes(byte[] value, int length) throws JMSException
- {
- checkBodyReadable();
-
- if(length < 0 || value.length < length)
- {
- throw new IndexOutOfBoundsException("length must not be negative or larger than the size of the provided array");
- }
-
- try
- {
- int offset = 0;
- while(offset < length)
- {
- int read = _inputStreamHelper.read(value, offset, length - offset);
- if(read < 0)
- {
- break;
- }
- offset += read;
- }
-
- if(offset == 0 && length != 0)
- {
- return -1;
- }
- else
- {
- return offset;
- }
- }
- catch (IOException e)
- {
- throw createInputException(e);
- }
- }
-
- @Override
- public void writeBoolean(boolean value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeBoolean(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeByte(byte value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeByte(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeShort(short value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeShort(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeChar(char value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeChar(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeInt(int value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeInt(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeLong(long value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeLong(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeFloat(float value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeFloat(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeDouble(double value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeDouble(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeUTF(String value) throws JMSException
- {
- checkBodyWritable();
-
- try
- {
- _outputStreamHelper.writeUTF(value);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeBytes(byte[] bytes) throws JMSException
- {
- writeBytes(bytes, 0, bytes.length);
- }
-
- @Override
- public void writeBytes(byte[] value, int offset, int length) throws JMSException
- {
- checkBodyWritable();
- try
- {
- _outputStreamHelper.write(value, offset, length);
- }
- catch (IOException e)
- {
- throw createOutputException(e);
- }
- }
-
- @Override
- public void writeObject(Object value) throws JMSException
- {
- checkBodyWritable();
- if(value == null)
- {
- throw new NullPointerException("Value passed to BytesMessage.writeObject() must not be null");
- }
- else if (value instanceof Boolean)
- {
- writeBoolean((Boolean)value);
- }
- else if (value instanceof Byte)
- {
- writeByte((Byte)value);
- }
- else if (value instanceof Short)
- {
- writeShort((Short)value);
- }
- else if (value instanceof Character)
- {
- writeChar((Character)value);
- }
- else if (value instanceof Integer)
- {
- writeInt((Integer)value);
- }
- else if(value instanceof Long)
- {
- writeLong((Long)value);
- }
- else if(value instanceof Float)
- {
- writeFloat((Float) value);
- }
- else if(value instanceof Double)
- {
- writeDouble((Double) value);
- }
- else if(value instanceof String)
- {
- writeUTF((String) value);
- }
- else if(value instanceof byte[])
- {
- writeBytes((byte[])value);
- }
- else
- {
- throw new QpidJmsMessageFormatException("Value passed to BytesMessage.writeObject() must be of primitive type. Type passed was " + value.getClass().getName());
- }
- }
-
- @Override
- public void reset() throws JMSException
- {
- //If we have created an output stream previously, this is either
- //a new message or we cleared the body of a received message
- if(_outputStreamHelper.hasOutputStreams())
- {
- //update the underlying message and create new input stream based on the current output
- byte[] data = _outputStreamHelper.getByteOutput();
- getUnderlyingAmqpMessage(false).setBytes(data);
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- _inputStreamHelper.createNewInputStream(bais);
-
- //clear the current output streams
- _outputStreamHelper.clearOutputStreams();
- }
- else
- {
- //This is a received message that has not
- //yet been cleared, recreate the input stream
- createDataInputStreamFromUnderlyingMessage();
- }
-
- setBodyWritable(false);
- }
-
- @Override
- public void clearBody() throws JMSException
- {
- //clear any prior input stream, and the underlying message body
- getUnderlyingAmqpMessage(false).setBytes(null);
- _inputStreamHelper.clearInputStream();
-
- //reset the output streams
- _outputStreamHelper.createOutputStreams();
-
- setBodyWritable(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
deleted file mode 100644
index bfaacce..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-public class ClientProperties
-{
- //JMS-defined Property Names
- public static final String JMSXUSERID = "JMSXUserID";
- public static final String JMSXGROUPID = "JMSXGroupID";
- public static final String JMSXGROUPSEQ = "JMSXGroupSeq";
-
- //Custom Message Property Names
- public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
- public static final String JMS_AMQP_REPLY_TO_GROUP_ID = "JMS_AMQP_REPLY_TO_GROUP_ID";
- public static final String JMS_AMQP_TYPED_ENCODING = "JMS_AMQP_TYPED_ENCODING";
-
- //Message Annotation Names
- public static final String X_OPT_JMS_MSG_TYPE = "x-opt-jms-msg-type";
- public static final String X_OPT_APP_CORRELATION_ID = "x-opt-app-correlation-id";
- public static final String X_OPT_JMS_TYPE = "x-opt-jms-type";
-
- //Message Annotation Values
- //X_OPT_JMS_MSG_TYPE
- public static final byte GENERIC_MESSAGE_TYPE = 0;
- public static final byte OBJECT_MESSSAGE_TYPE = 1;
- public static final byte MAP_MESSAGE_TYPE = 2;
- public static final byte BYTES_MESSAGE_TYPE = 3;
- public static final byte STREAM_MESSAGE_TYPE = 4;
- public static final byte TEXT_MESSAGE_TYPE = 5;
-
- //Client configuration System Property names
- public static final String QPID_SET_JMSXUSERID_ON_SEND = "qpid.set-jmsxuserid-on-send";
- public static final String QPID_DEFAULT_CONSUMER_PREFETCH = "qpid.default-consumer-prefetch";
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java
deleted file mode 100644
index a7310e5..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-public class ConnectionException extends QpidJmsException
-{
- private static final long serialVersionUID = 419676688719664719L;
-
- public ConnectionException(String msg)
- {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
deleted file mode 100644
index 2f98932..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.qpid.jms.engine.AmqpConnection;
-import org.apache.qpid.jms.engine.AmqpConnectionDriverNetty;
-import org.apache.qpid.jms.engine.AmqpResourceRequest;
-import org.apache.qpid.jms.engine.AmqpSession;
-
-/**
- * A JMS connection.
- * Thread-safety:
- * <ul>
- * <li>All public methods are thread-safe</li>
- * <li>Other internal classes must use the connection's lock and state-change methods -
- * see {@link #lock()}/{@link #releaseLock()} and {@link #stateChanged()} for details.</li>
- * </ul>
- *
- * TODO wherever we throws JMSException, throw a subclass that has the cause set
- */
-public class ConnectionImpl implements Connection
-{
- private static final Logger _logger = Logger.getLogger(ConnectionImpl.class.getName());
-
- private AmqpConnection _amqpConnection;
-
- /** The driver dedicated to this connection */
- private AmqpConnectionDriverNetty _amqpConnectionDriver;
-
- private ConnectionLock _connectionLock;
-
- private volatile boolean _isStarted;
-
- private DestinationHelper _destinationHelper;
- private MessageIdHelper _messageIdHelper;
-
- private String _username;
-
- /**
- * TODO: accept a client id
- * TODO: defer connection to the broker if client has not been set. Defer it until any other method is called.
- */
- public ConnectionImpl(String clientName, String remoteHost, int port, String username, String password) throws JMSException
- {
- _username = username;
- _amqpConnection = new AmqpConnection(clientName, remoteHost, port);
- _amqpConnection.setUsername(_username);
- _amqpConnection.setPassword(password);
-
- try
- {
- _amqpConnectionDriver = new AmqpConnectionDriverNetty();
- _amqpConnectionDriver.registerConnection(_amqpConnection);
-
- _connectionLock = new ConnectionLock(this);
- _connectionLock.setConnectionStateChangeListener(new ConnectionStateChangeListener()
- {
- @Override
- public void stateChanged(ConnectionImpl connection)
- {
- connection._amqpConnectionDriver.setLocallyUpdated(connection._amqpConnection);
- }
- });
-
- connect();
- }
- catch (IOException e)
- {
- throw new QpidJmsException("Unable to create connection", e);
- }
-
- _destinationHelper = new DestinationHelper();
- _messageIdHelper = new MessageIdHelper();
- }
-
- AmqpConnection getAmqpConnection()
- {
- return _amqpConnection;
- }
-
- void waitUntil(Predicate condition, long timeoutMillis) throws JmsTimeoutException, JmsInterruptedException
- {
- long deadline = timeoutMillis < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeoutMillis;
-
- boolean wait = deadline > System.currentTimeMillis();
- boolean first = true;
- boolean done = false;
-
- synchronized (_amqpConnection)
- {
- while (first || (!done && wait))
- {
- if(_logger.isLoggable(Level.FINER))
- {
- _logger.log(Level.FINER,
- "About to waitUntil {0}. first={1}, done={2}, wait={3}",
- new Object[] {condition, first, done, wait});
- }
- if (wait && !done && !first)
- {
- try
- {
- _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - System.currentTimeMillis());
- }
- catch (InterruptedException e)
- {
- //Note we are not setting the interrupted status, as it
- //is likely that user code will reenter the client code to
- //perform e.g close/rollback/etc and setting the status
- //could erroneously make those fail.
- throw new JmsInterruptedException("Interrupted while waiting for conditition: "
- + condition.getCurrentState() , e);
- }
- }
-
- wait = deadline > System.currentTimeMillis();
- done = done || condition.test();
- first = false;
- }
- if(_logger.isLoggable(Level.FINER))
- {
- _logger.log(Level.FINER,
- "Finished waitUntil {0}. first={1}, done={2}, wait={3}",
- new Object[] {condition, first, done, wait});
- }
-
- if (!done)
- {
- throw new JmsTimeoutException(timeoutMillis, condition.getCurrentState());
- }
- }
- }
-
- boolean isStarted()
- {
- return _isStarted;
- }
-
- private void connect() throws IOException, ConnectionException, JmsTimeoutException, JmsInterruptedException
- {
- lock();
- try
- {
- waitUntil(new SimplePredicate("Connection established or failed", _amqpConnection)
- {
- @Override
- public boolean test()
- {
- return _amqpConnection.isConnected() || _amqpConnection.isAuthenticationError() || _amqpConnection.getConnectionError().getCondition() != null;
- }
- }, AmqpConnection.TIMEOUT);
-
- //TODO: sort out exception throwing
- if(_amqpConnection.getConnectionError().getCondition() != null)
- {
- throw new ConnectionException("Connection failed: " + _amqpConnection.getConnectionError());
- }
-
- if(_amqpConnection.isAuthenticationError())
- {
- throw new ConnectionException("Connection failed: authentication failure");
- }
-
- if(!_amqpConnection.isConnected())
- {
- throw new ConnectionException("Connection failed");
- }
- }
- finally
- {
- releaseLock();
- }
- }
-
- /**
- * <p>
- * Acquire the connection lock.
- * </p>
- * <p>
- * Must be held by an application thread before reading or modifying
- * the state of this connection or any of its associated child objects
- * (e.g. sessions, senders, receivers, links, and messages).
- * Also must be held when calling {@link #stateChanged()}.
- * </p>
- * <p>
- * Following these rules ensures that this lock is acquired BEFORE the lock(s) managed by {@link AmqpConnection}.
- * </p>
- *
- * @see #releaseLock()
- */
- void lock()
- {
- _connectionLock.lock();
- }
-
- /**
- * @see #lock()
- */
- void releaseLock()
- {
- _connectionLock.unlock();
- }
-
- /**
- * Inform the connection that its state has been locally changed so that, for example,
- * it can schedule network I/O to occur.
- * The caller must first acquire the connection lock (via {@link #lock()}).
- */
- void stateChanged()
- {
- _connectionLock.stateChanged();
- }
-
- String getUserName()
- {
- return _username;
- }
-
- void waitForResult(AmqpResourceRequest<Void> request, String message) throws JMSException
- {
- try
- {
- request.getResult();
- }
- catch (IOException e)
- {
- throw new QpidJmsException(message, e);
- }
- }
-
- //======= JMS Methods =======
-
-
- @Override
- public void close() throws JMSException
- {
- //TODO: allow for concurrent/duplicate invocations
- lock();
- try
- {
- AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
-
- synchronized (_amqpConnection)
- {
- _amqpConnection.close(request);
- stateChanged();
- }
-
- waitForResult(request, "Exception while closing connection");
-
- _amqpConnectionDriver.stop();
-
- if(_amqpConnection.getConnectionError().getCondition() != null)
- {
- throw new ConnectionException("Connection close failed: " + _amqpConnection.getConnectionError());
- }
- }
- catch(InterruptedException e)
- {
- throw new JmsInterruptedException("Interrupted while trying to close connection", e);
- }
- finally
- {
- releaseLock();
- }
- }
-
- @Override
- public SessionImpl createSession(boolean transacted, int acknowledgeMode) throws JMSException
- {
- if(transacted)
- {
- throw new UnsupportedOperationException("Only transacted=false is currently supported");
- }
- if(acknowledgeMode != Session.AUTO_ACKNOWLEDGE)
- {
- throw new UnsupportedOperationException("Only acknowledgeMode=AUTO_ACKNOWLEDGE is currently supported");
- }
-
- lock();
- try
- {
- AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
-
- SessionImpl session = null;
- synchronized (_amqpConnection)
- {
- AmqpSession amqpSession = _amqpConnection.createSession();
- session = new SessionImpl(acknowledgeMode, amqpSession, this, _destinationHelper, _messageIdHelper);
- session.open(request);
- stateChanged();
- }
-
- waitForResult(request, "Exception while creating session");
-
- return session;
- }
- finally
- {
- releaseLock();
- }
- }
-
- @Override
- public String getClientID() throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public void setClientID(String clientID) throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public ConnectionMetaData getMetaData() throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public ExceptionListener getExceptionListener() throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public void setExceptionListener(ExceptionListener listener) throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public void start() throws JMSException
- {
- _isStarted = true;
- }
-
- @Override
- public void stop() throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionLock.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionLock.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionLock.java
deleted file mode 100644
index c8671e4..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionLock.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import java.util.concurrent.locks.ReentrantLock;
-
-public class ConnectionLock extends ReentrantLock
-{
- private static final long serialVersionUID = 1006213282468441380L;
-
- private ConnectionStateChangeListener _listener;
- private ConnectionImpl _connection;
-
- public ConnectionLock(ConnectionImpl connection)
- {
- _connection = connection;
- }
-
- public void setConnectionStateChangeListener(ConnectionStateChangeListener listener)
- {
- _listener = listener;
- }
-
- public void stateChanged()
- {
- if(_listener != null)
- {
- _listener.stateChanged(_connection);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionStateChangeListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionStateChangeListener.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionStateChangeListener.java
deleted file mode 100644
index 625983a..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/ConnectionStateChangeListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-public interface ConnectionStateChangeListener
-{
- void stateChanged(ConnectionImpl connection);
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
deleted file mode 100644
index 7cd368a..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-
-public class DestinationHelper
-{
- public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type";
- public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type";
-
- static final String QUEUE_ATTRIBUTE = "queue";
- static final String TOPIC_ATTRIBUTE = "topic";
- static final String TEMPORARY_ATTRIBUTE = "temporary";
-
- public static final String QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE;
- public static final String TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE;
- public static final String TEMP_QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE;
- public static final String TEMP_TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE;
-
- public DestinationHelper()
- {
- }
-
- public Queue createQueue(String address)
- {
- return new QueueImpl(address);
- }
-
- public Topic createTopic(String address)
- {
- return new TopicImpl(address);
- }
-
- private Destination createBaseDestination(String address)
- {
- return new DestinationImpl(address);
- }
-
- /**
- * Decode the provided address, type description, and consumer destination information such that
- * an appropriate Destination object can be returned.
- *
- * If an address and type description is provided then this will be used to create the Destination. If
- * the type information is missing, it will be derived from the consumer destination if present, or
- * default to a generic destination if not.
- *
- * If the address is null then the consumer destination is returned, unless the
- * useConsumerDestForTypeOnly flag is true, in which case null will be returned.
- */
- public Destination decodeDestination(String address, String typeString, Destination consumerDestination, boolean useConsumerDestForTypeOnly)
- {
- Set<String> typeSet = null;
-
- if(typeString != null)
- {
- typeSet = splitAttributes(typeString);
- }
-
- return createDestination(address, typeSet, consumerDestination, useConsumerDestForTypeOnly);
- }
-
- private Destination createDestination(String address, Set<String> typeSet, Destination consumerDestination, boolean useConsumerDestForTypeOnly)
- {
- if(address == null)
- {
- return useConsumerDestForTypeOnly ? null : consumerDestination;
- }
-
- if(typeSet != null && !typeSet.isEmpty())
- {
- if(typeSet.contains(QUEUE_ATTRIBUTE))
- {
- if(typeSet.contains(TEMPORARY_ATTRIBUTE))
- {
- //TODO
- throw new IllegalArgumentException("TemporaryQueue not yet supported");
- }
- else
- {
- return createQueue(address);
- }
- }
- else if(typeSet.contains(TOPIC_ATTRIBUTE))
- {
- if(typeSet.contains(TEMPORARY_ATTRIBUTE))
- {
- //TODO
- throw new IllegalArgumentException("TemporaryTopic not yet supported");
- }
- else
- {
- return createTopic(address);
- }
- }
- }
-
- if(consumerDestination instanceof TemporaryQueue)
- {
- //TODO
- throw new IllegalArgumentException("Unsupported Destination type: " + consumerDestination.getClass().getName());
- }
- else if(consumerDestination instanceof TemporaryTopic)
- {
- //TODO
- throw new IllegalArgumentException("Unsupported Destination type: " + consumerDestination.getClass().getName());
- }
- else if(consumerDestination instanceof Queue)
- {
- return createQueue(address);
- }
- else if(consumerDestination instanceof Topic)
- {
- return createTopic(address);
- }
-
- //fall back to a straight Destination
- return createBaseDestination(address);
- }
-
- public Destination convertToQpidDestination(Destination dest) throws JMSException
- {
- if(dest == null)
- {
- return null;
- }
-
- if(isQpidDestination(dest))
- {
- return dest;
- }
- else
- {
- if(dest instanceof TemporaryQueue)
- {
- //TODO
- throw new IllegalArgumentException("Unsupported Destination type: " + dest.getClass().getName());
- }
- else if(dest instanceof TemporaryTopic)
- {
- //TODO
- throw new IllegalArgumentException("Unsupported Destination type: " + dest.getClass().getName());
- }
- else if(dest instanceof Queue)
- {
- return createQueue(((Queue) dest).getQueueName());
- }
- else if(dest instanceof Topic)
- {
- return createTopic(((Topic) dest).getTopicName());
- }
- else
- {
- throw new IllegalArgumentException("Unsupported Destination type: " + dest.getClass().getName());
- }
- }
- }
-
- public boolean isQpidDestination(Destination dest)
- {
- return dest instanceof DestinationImpl;
- }
-
- public String decodeAddress(Destination destination) throws JMSException
- {
- if(destination == null)
- {
- return null;
- }
-
- if(!isQpidDestination(destination))
- {
- destination = convertToQpidDestination(destination);
- }
-
- if(destination instanceof Queue)
- {
- return ((Queue) destination).getQueueName();
- }
- else if(destination instanceof Topic)
- {
- return ((Topic) destination).getTopicName();
- }
- else
- {
- throw new IllegalArgumentException("Support for those destinations not yet implemented");
- }
- }
-
- /**
- * @return the annotation type string, or null if the supplied destination is null or can't be classified
- */
- public String decodeTypeString(Destination destination)
- {
- if(destination == null)
- {
- return null;
- }
-
- if(destination instanceof TemporaryQueue)
- {
- return TEMP_QUEUE_ATTRIBUTES_STRING;
- }
- else if(destination instanceof Queue)
- {
- return QUEUE_ATTRIBUTES_STRING;
- }
- else if(destination instanceof TemporaryTopic)
- {
- return TEMP_TOPIC_ATTRIBUTES_STRING;
- }
- else if(destination instanceof Topic)
- {
- return TOPIC_ATTRIBUTES_STRING;
- }
- else
- {
- //unable to classify
- return null;
- }
- }
-
- Set<String> splitAttributes(String typeString)
- {
- if( typeString == null )
- {
- return null;
- }
-
- HashSet<String> typeSet = new HashSet<String>();
-
- //Split string on commas and their surrounding whitespace
- for( String attr : typeString.split("\\s*,\\s*") )
- {
- //ignore empty values
- if(!attr.equals(""))
- {
- typeSet.add(attr);
- }
- }
-
- return typeSet;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationImpl.java
deleted file mode 100644
index a0e9973..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/DestinationImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import javax.jms.Destination;
-
-public class DestinationImpl implements Destination
-{
- private String _address;
-
- public DestinationImpl(String address)
- {
- if(address == null)
- {
- throw new IllegalArgumentException("Destination address must not be null");
- }
-
- _address = address;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- @Override
- public String toString()
- {
- return getAddress();
- }
-
- @Override
- public int hashCode()
- {
- return getAddress().hashCode();
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
-
- if (o != null && getClass() == o.getClass())
- {
- return getAddress().equals(((DestinationImpl)o).getAddress());
- }
- else
- {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
deleted file mode 100644
index c0414f7..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.jms.impl;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.apache.qpid.jms.engine.AmqpGenericMessage;
-
-public class GenericAmqpMessageImpl extends MessageImpl<AmqpGenericMessage> implements Message
-{
- //message to be sent
- public GenericAmqpMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
- {
- super(new AmqpGenericMessage(), sessionImpl, connectionImpl);
- }
-
- //message just received
- public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
- {
- super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
-
- @Override
- protected AmqpGenericMessage prepareUnderlyingAmqpMessageForSending(AmqpGenericMessage amqpMessage)
- {
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
- }
-
- @Override
- public void clearBody() throws JMSException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/IdConversionException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/IdConversionException.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/IdConversionException.java
deleted file mode 100644
index 79d079b..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/IdConversionException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-public class IdConversionException extends QpidJmsException
-{
- private static final long serialVersionUID = -2349723813650476823L;
-
- public IdConversionException(String reason)
- {
- super(reason);
- }
-
- public IdConversionException(String reason, Exception cause)
- {
- super(reason, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/InputStreamHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/InputStreamHelper.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/InputStreamHelper.java
deleted file mode 100644
index 8b3ce29..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/InputStreamHelper.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-public class InputStreamHelper
-{
- private DataInputStream _dataInputStream;
-
- public InputStreamHelper()
- {
- }
-
- public void createNewInputStream(ByteArrayInputStream bais)
- {
- _dataInputStream = new DataInputStream(bais);
- }
-
- public void clearInputStream()
- {
- _dataInputStream = null;
- }
-
- /**
- * @see DataInputStream#readBoolean()
- * @throws IOException
- */
- public boolean readBoolean() throws IOException
- {
- return _dataInputStream.readBoolean();
- }
-
- /**
- * @see DataInputStream#readByte()
- * @throws IOException
- */
- public byte readByte() throws IOException
- {
- return _dataInputStream.readByte();
- }
-
- /**
- * @see DataInputStream#readUnsignedByte()
- * @throws IOException
- */
- public int readUnsignedByte() throws IOException
- {
- return _dataInputStream.readUnsignedByte();
- }
-
- /**
- * @see DataInputStream#readShort()
- * @throws IOException
- */
- public short readShort() throws IOException
- {
- return _dataInputStream.readShort();
- }
-
- /**
- * @see DataInputStream#readUnsignedShort()
- * @throws IOException
- */
- public int readUnsignedShort() throws IOException
- {
- return _dataInputStream.readUnsignedShort();
- }
-
- /**
- * @see DataInputStream#readChar()
- * @throws IOException
- */
- public char readChar() throws IOException
- {
- return _dataInputStream.readChar();
- }
-
- /**
- * @see DataInputStream#readInt()
- * @throws IOException
- */
- public int readInt() throws IOException
- {
- return _dataInputStream.readInt();
- }
-
- /**
- * @see DataInputStream#readLong()
- * @throws IOException
- */
- public long readLong() throws IOException
- {
- return _dataInputStream.readLong();
- }
-
- /**
- * @see DataInputStream#readFloat()
- * @throws IOException
- */
- public float readFloat() throws IOException
- {
- return _dataInputStream.readFloat();
- }
-
- /**
- * @see DataInputStream#readDouble()
- * @throws IOException
- */
- public double readDouble() throws IOException
- {
- return _dataInputStream.readDouble();
- }
-
- /**
- * @see DataInputStream#readUTF()
- * @throws IOException
- */
- public String readUTF() throws IOException
- {
- return _dataInputStream.readUTF();
- }
-
- /**
- * @see DataInputStream#read(byte[], int, int)
- * @throws IOException
- */
- public int read(byte dest[], int offset, int length) throws IOException
- {
- return _dataInputStream.read(dest, offset, length);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java
deleted file mode 100644
index e2872ad..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsInterruptedException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-/**
- * Used to signal that we have been interrupted.
- *
- * When we throw this, the interrupted status is not set on the thread.
- */
-public class JmsInterruptedException extends QpidJmsException
-{
- private static final long serialVersionUID = 384180653752426597L;
-
- public JmsInterruptedException(String reason, InterruptedException cause)
- {
- super(reason, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java
deleted file mode 100644
index 9a966ed..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/JmsTimeoutException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-public class JmsTimeoutException extends QpidJmsException
-{
- private static final long serialVersionUID = 7486676055343430641L;
-
- public JmsTimeoutException(long timeoutMillis, String pendingCondition)
- {
- super("Timed out after " + timeoutMillis + " ms waiting for condition: " + pendingCondition);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkException.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkException.java
deleted file mode 100644
index 45a4c96..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-public class LinkException extends QpidJmsException
-{
- private static final long serialVersionUID = 419676688719664719L;
-
- public LinkException(String msg)
- {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
deleted file mode 100644
index ca8ecc1..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.engine.AmqpLink;
-import org.apache.qpid.jms.engine.AmqpResourceRequest;
-
-public class LinkImpl
-{
- private ConnectionImpl _connectionImpl;
- private AmqpLink _amqpLink;
-
- public LinkImpl(ConnectionImpl connectionImpl, AmqpLink amqpLink)
- {
- _connectionImpl = connectionImpl;
- _amqpLink = amqpLink;
- }
-
- public void close() throws JMSException
- {
- _connectionImpl.lock();
- try
- {
- AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
-
- synchronized (_connectionImpl.getAmqpConnection())
- {
- _amqpLink.close(request);
- _connectionImpl.stateChanged();
- }
-
- _connectionImpl.waitForResult(request, "Exception while closing link");
- }
- finally
- {
- _connectionImpl.releaseLock();
- }
- }
-
- ConnectionImpl getConnectionImpl()
- {
- return _connectionImpl;
- }
-
- public void open(AmqpResourceRequest<Void> request) throws JmsTimeoutException, JmsInterruptedException
- {
- _amqpLink.open(request);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
deleted file mode 100644
index 707c825..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.jms.impl;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Set;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
-
-import org.apache.qpid.jms.engine.AmqpMapMessage;
-
-public class MapMessageImpl extends MessageImpl<AmqpMapMessage> implements MapMessage
-{
- //message to be sent
- public MapMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
- {
- super(new AmqpMapMessage(), sessionImpl, connectionImpl);
- }
-
- //message just received
- public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
- {
- super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
-
- @Override
- protected AmqpMapMessage prepareUnderlyingAmqpMessageForSending(AmqpMapMessage amqpMessage)
- {
- //Currently nothing to do, we always operate directly on the underlying AmqpMapMessage.
- return amqpMessage;
- }
-
- private void setMapEntry(String name, Object value) throws IllegalArgumentException, MessageNotWriteableException
- {
- validateMapKeyName(name);
- checkBodyWritable();
-
- getUnderlyingAmqpMessage(false).setMapEntry(name, value);
- }
-
- private Object getMapEntry(String name) throws IllegalArgumentException
- {
- validateMapKeyName(name);
-
- return getUnderlyingAmqpMessage(false).getMapEntry(name);
- }
-
- private void validateMapKeyName(String name) throws IllegalArgumentException
- {
- if (name == null)
- {
- throw new IllegalArgumentException("Map key name must not be null");
- }
- else if (name.length() == 0)
- {
- throw new IllegalArgumentException("Map key name must not be the empty string");
- }
- }
-
- private boolean checkObjectMapValueIsValid(Object object) throws MessageFormatException
- {
- boolean valid = object instanceof Boolean || object instanceof Byte || object instanceof Short ||
- object instanceof Integer || object instanceof Long || object instanceof Float ||
- object instanceof Double || object instanceof String|| object instanceof Character ||
- object instanceof byte[] || object == null;
- if(!valid)
- {
- throw new QpidJmsMessageFormatException("Invalid object value type: " + object.getClass());
- }
-
- return true;
- }
-
- //======= JMS Methods =======
-
- @Override
- public boolean getBoolean(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Boolean)
- {
- return ((Boolean) value).booleanValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Boolean.valueOf((String) value);
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to boolean.");
- }
- }
-
- @Override
- public byte getByte(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Byte)
- {
- return ((Byte) value).byteValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Byte.valueOf((String) value).byteValue();
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to byte.");
- }
- }
-
- @Override
- public short getShort(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Short)
- {
- return ((Short) value).shortValue();
- }
- else if (value instanceof Byte)
- {
- return ((Byte) value).shortValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Short.valueOf((String) value).shortValue();
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to short.");
- }
- }
-
- @Override
- public char getChar(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if ((value instanceof Character))
- {
- return (char) value;
- }
- else if(value == null)
- {
- throw new NullPointerException("Map entry " + name + " with null value cannot be converted to char.");
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to char.");
- }
- }
-
- @Override
- public int getInt(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Integer)
- {
- return ((Integer) value).intValue();
- }
- else if (value instanceof Short)
- {
- return ((Short) value).intValue();
- }
- else if (value instanceof Byte)
- {
- return ((Byte) value).intValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Integer.valueOf((String) value).intValue();
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to int.");
- }
- }
-
- @Override
- public long getLong(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Long)
- {
- return ((Long) value).longValue();
- }
- else if (value instanceof Integer)
- {
- return ((Integer) value).longValue();
- }
- else if (value instanceof Short)
- {
- return ((Short) value).longValue();
- }
- else if (value instanceof Byte)
- {
- return ((Byte) value).longValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Long.valueOf((String) value).longValue();
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to long.");
- }
- }
-
- @Override
- public float getFloat(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Float)
- {
- return ((Float) value).floatValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Float.valueOf((String) value).floatValue();
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to float.");
- }
- }
-
- @Override
- public double getDouble(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if (value instanceof Double)
- {
- return ((Double) value).doubleValue();
- }
- else if (value instanceof Float)
- {
- return ((Float) value).doubleValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Double.valueOf((String) value).doubleValue();
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to double.");
- }
- }
-
- @Override
- public String getString(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if ((value instanceof String) || (value == null))
- {
- return (String) value;
- }
- else if (value instanceof byte[])
- {
- throw new MessageFormatException("Map entry " + name + " of type byte[] " + "cannot be converted to String.");
- }
- else
- {
- return value.toString();
- }
- }
-
- @Override
- public byte[] getBytes(String name) throws JMSException
- {
- Object value = getMapEntry(name);
-
- if ((value instanceof byte[]) || (value == null))
- {
- return (byte[]) value;
- }
- else
- {
- throw new MessageFormatException("Map entry " + name + " of type " + value.getClass().getName() + " cannot be converted to byte[].");
- }
- }
-
- @Override
- public Object getObject(String name) throws JMSException
- {
- return getMapEntry(name);
- }
-
- @Override
- public Enumeration<?> getMapNames() throws JMSException
- {
- Set<String> names = getUnderlyingAmqpMessage(false).getMapKeys();
-
- return Collections.enumeration(names);
- }
-
- @Override
- public void setBoolean(String name, boolean value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setByte(String name, byte value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setShort(String name, short value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setChar(String name, char value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setInt(String name, int value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setLong(String name, long value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setFloat(String name, float value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setDouble(String name, double value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setString(String name, String value) throws JMSException
- {
- setMapEntry(name, value);
- }
-
- @Override
- public void setBytes(String name, byte[] value) throws JMSException
- {
- int length = 0;
- if(value != null)
- {
- length = value.length;
- }
-
- setBytes(name, value, 0, length);
- }
-
- @Override
- public void setBytes(String name, byte[] value, int offset, int length)
- throws JMSException
- {
- byte[] dest;
-
- if(value == null)
- {
- dest = null;
- }
- else
- {
- dest = new byte[length];
- System.arraycopy(value, offset, dest, 0, length);
- }
-
- setMapEntry(name, dest);
- }
-
- @Override
- public void setObject(String name, Object value) throws JMSException
- {
- checkObjectMapValueIsValid(value);
- setMapEntry(name, value);
- }
-
- @Override
- public boolean itemExists(String name) throws JMSException
- {
- return getUnderlyingAmqpMessage(false).mapEntryExists(name);
- }
-
- @Override
- public void clearBody() throws JMSException
- {
- getUnderlyingAmqpMessage(false).clearMapEntries();
- setBodyWritable(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6f106f64/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java b/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
deleted file mode 100644
index 5be6d63..0000000
--- a/qpid-jms-client-tmp/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms.impl;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.apache.qpid.jms.engine.AmqpBytesMessage;
-import org.apache.qpid.jms.engine.AmqpGenericMessage;
-import org.apache.qpid.jms.engine.AmqpListMessage;
-import org.apache.qpid.jms.engine.AmqpMapMessage;
-import org.apache.qpid.jms.engine.AmqpMessage;
-import org.apache.qpid.jms.engine.AmqpObjectMessage;
-import org.apache.qpid.jms.engine.AmqpTextMessage;
-
-public class MessageFactoryImpl
-{
- Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
- {
- if(amqpMessage instanceof AmqpTextMessage)
- {
- return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
- else if(amqpMessage instanceof AmqpBytesMessage)
- {
- return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
- else if(amqpMessage instanceof AmqpObjectMessage)
- {
- return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
- else if(amqpMessage instanceof AmqpListMessage)
- {
- return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
- else if(amqpMessage instanceof AmqpMapMessage)
- {
- return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
- else if(amqpMessage instanceof AmqpGenericMessage)
- {
- return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
- }
- else
- {
- //TODO: support other message types
- throw new QpidJmsException("Unknown Message Type");
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org