You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 00:05:07 UTC
svn commit: r1534394 [9/22] - in /qpid/branches/linearstore/qpid: ./ cpp/
cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/
cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/
cpp/examples/qmf-agent/ cpp/include/qpid/ cp...
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java Mon Oct 21 22:04:51 2013
@@ -1,538 +1,538 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.BytesMessage;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import java.io.*;
-import java.util.*;
-
-public class BytesMessageImpl extends MessageImpl implements BytesMessage
-{
- private DataInputStream _dataAsInput;
- private DataOutputStream _dataAsOutput;
- private ByteArrayOutputStream _bytesOut;
- private Data _dataIn;
-
- // message created for reading
- protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data,
- Footer footer, SessionImpl session)
- {
- super(header, messageAnnotations, properties, appProperties, footer, session);
- _dataIn = data;
- final Binary dataBuffer = data.getValue();
- _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
-
- }
-
- // message created to be sent
- protected BytesMessageImpl(final SessionImpl session)
- {
- super(new Header(),
- new MessageAnnotations(new HashMap()),
- new Properties(),
- new ApplicationProperties(new HashMap()),
- new Footer(Collections.EMPTY_MAP),
- session);
-
- _bytesOut = new ByteArrayOutputStream();
- _dataAsOutput = new DataOutputStream(_bytesOut);
- }
-
-
- private Data getDataSection()
- {
- if(_bytesOut != null)
- {
- return new Data(new Binary(_bytesOut.toByteArray()));
- }
- else
- {
- return _dataIn;
- }
- }
-
- @Override
- protected boolean isReadOnly()
- {
- return _dataIn != null;
- }
-
- public long getBodyLength() throws JMSException
- {
- checkReadable();
- return getDataSection().getValue().getLength();
- }
-
- public boolean readBoolean() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readBoolean();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
-
- public byte readByte() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readByte();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public int readUnsignedByte() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readUnsignedByte();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public short readShort() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readShort();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public int readUnsignedShort() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readUnsignedShort();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public char readChar() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readChar();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public int readInt() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readInt();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public long readLong() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readLong();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public float readFloat() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readFloat();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public double readDouble() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readDouble();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public String readUTF() throws JMSException
- {
- checkReadable();
- try
- {
- return _dataAsInput.readUTF();
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public int readBytes(byte[] bytes) throws JMSException
- {
-
- return readBytes(bytes, bytes.length);
- }
-
- public int readBytes(byte[] bytes, int length) throws JMSException
- {
- checkReadable();
-
- try
- {
- int offset = 0;
- while(offset < length)
- {
- int read = _dataAsInput.read(bytes, offset, length - offset);
- if(read < 0)
- {
- break;
- }
- offset += read;
- }
-
- if(offset == 0 && length != 0)
- {
- return -1;
- }
- else
- {
- return offset;
- }
- }
- catch (IOException e)
- {
- throw handleInputException(e);
- }
- }
-
- public void writeBoolean(boolean b) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeBoolean(b);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
-
- }
-
- public void writeByte(byte b) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeByte(b);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeShort(short i) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeShort(i);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeChar(char c) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeChar(c);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeInt(int i) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeInt(i);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeLong(long l) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeLong(l);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeFloat(float v) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeFloat(v);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeDouble(double v) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeDouble(v);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeUTF(String s) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.writeUTF(s);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeBytes(byte[] bytes) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.write(bytes);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeBytes(byte[] bytes, int off, int len) throws JMSException
- {
- checkWritable();
- try
- {
- _dataAsOutput.write(bytes, off, len);
- }
- catch (IOException e)
- {
- throw handleOutputException(e);
- }
- }
-
- public void writeObject(Object o) throws JMSException
- {
- checkWritable();
- if(o == null)
- {
- throw new NullPointerException("Value passed to BytesMessage.writeObject() must be non null");
- }
- else if (o instanceof Boolean)
- {
- writeBoolean((Boolean)o);
- }
- else if (o instanceof Byte)
- {
- writeByte((Byte)o);
- }
- else if (o instanceof Short)
- {
- writeShort((Short)o);
- }
- else if (o instanceof Character)
- {
- writeChar((Character)o);
- }
- else if (o instanceof Integer)
- {
- writeInt((Integer)o);
- }
- else if(o instanceof Long)
- {
- writeLong((Long)o);
- }
- else if(o instanceof Float)
- {
- writeFloat((Float) o);
- }
- else if(o instanceof Double)
- {
- writeDouble((Double) o);
- }
- else if(o instanceof String)
- {
- writeUTF((String) o);
- }
- else if(o instanceof byte[])
- {
- writeBytes((byte[])o);
- }
- else
- {
- throw new MessageFormatException("Value passed to BytesMessage.writeObject() must be of primitive type. Type passed was " + o.getClass().getName());
- }
- }
-
- public void reset() throws JMSException
- {
- if(_bytesOut != null)
- {
- byte[] data = _bytesOut.toByteArray();
- _dataIn = new Data(new Binary(data));
- _dataAsInput = new DataInputStream(new ByteArrayInputStream(data));
- _dataAsOutput = null;
- _bytesOut = null;
- }
- else
- {
-
- final Binary dataBuffer = _dataIn.getValue();
- _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
-
- }
- }
-
- private JMSException handleInputException(final IOException e)
- {
- JMSException ex;
- if(e instanceof EOFException)
- {
- ex = new MessageEOFException(e.getMessage());
- }
- else
- {
- ex = new MessageFormatException(e.getMessage());
- }
- ex.initCause(e);
- ex.setLinkedException(e);
- return ex;
- }
-
- private JMSException handleOutputException(final IOException e)
- {
- JMSException ex = new JMSException(e.getMessage());
- ex.initCause(e);
- ex.setLinkedException(e);
- return ex;
- }
-
- @Override
- public void clearBody() throws JMSException
- {
- super.clearBody();
- _bytesOut = new ByteArrayOutputStream();
- _dataAsOutput = new DataOutputStream(_bytesOut);
- _dataAsInput = null;
- _dataIn = null;
- }
-
- @Override Collection<Section> getSections()
- {
- List<Section> sections = new ArrayList<Section>();
- sections.add(getHeader());
- if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
- {
- sections.add(getMessageAnnotations());
- }
- sections.add(getProperties());
- sections.add(getApplicationProperties());
- sections.add(getDataSection());
- sections.add(getFooter());
- return sections;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.BytesMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import java.io.*;
+import java.util.*;
+
+public class BytesMessageImpl extends MessageImpl implements BytesMessage
+{
+ private DataInputStream _dataAsInput;
+ private DataOutputStream _dataAsOutput;
+ private ByteArrayOutputStream _bytesOut;
+ private Data _dataIn;
+
+ // message created for reading
+ protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data,
+ Footer footer, SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _dataIn = data;
+ final Binary dataBuffer = data.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
+ }
+
+ // message created to be sent
+ protected BytesMessageImpl(final SessionImpl session)
+ {
+ super(new Header(),
+ new MessageAnnotations(new HashMap()),
+ new Properties(),
+ new ApplicationProperties(new HashMap()),
+ new Footer(Collections.EMPTY_MAP),
+ session);
+
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ }
+
+
+ private Data getDataSection()
+ {
+ if(_bytesOut != null)
+ {
+ return new Data(new Binary(_bytesOut.toByteArray()));
+ }
+ else
+ {
+ return _dataIn;
+ }
+ }
+
+ @Override
+ protected boolean isReadOnly()
+ {
+ return _dataIn != null;
+ }
+
+ public long getBodyLength() throws JMSException
+ {
+ checkReadable();
+ return getDataSection().getValue().getLength();
+ }
+
+ public boolean readBoolean() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readBoolean();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+
+ public byte readByte() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readByte();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readUnsignedByte() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readUnsignedByte();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public short readShort() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readShort();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readUnsignedShort() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readUnsignedShort();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public char readChar() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readChar();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readInt() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readInt();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public long readLong() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readLong();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public float readFloat() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readFloat();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public double readDouble() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readDouble();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public String readUTF() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _dataAsInput.readUTF();
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public int readBytes(byte[] bytes) throws JMSException
+ {
+
+ return readBytes(bytes, bytes.length);
+ }
+
+ public int readBytes(byte[] bytes, int length) throws JMSException
+ {
+ checkReadable();
+
+ try
+ {
+ int offset = 0;
+ while(offset < length)
+ {
+ int read = _dataAsInput.read(bytes, offset, length - offset);
+ if(read < 0)
+ {
+ break;
+ }
+ offset += read;
+ }
+
+ if(offset == 0 && length != 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return offset;
+ }
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ public void writeBoolean(boolean b) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeBoolean(b);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+
+ }
+
+ public void writeByte(byte b) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeByte(b);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeShort(short i) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeShort(i);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeChar(char c) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeChar(c);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeInt(int i) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeInt(i);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeLong(long l) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeLong(l);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeFloat(float v) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeFloat(v);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeDouble(double v) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeDouble(v);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeUTF(String s) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.writeUTF(s);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.write(bytes);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeBytes(byte[] bytes, int off, int len) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _dataAsOutput.write(bytes, off, len);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeObject(Object o) throws JMSException
+ {
+ checkWritable();
+ if(o == null)
+ {
+ throw new NullPointerException("Value passed to BytesMessage.writeObject() must be non null");
+ }
+ else if (o instanceof Boolean)
+ {
+ writeBoolean((Boolean)o);
+ }
+ else if (o instanceof Byte)
+ {
+ writeByte((Byte)o);
+ }
+ else if (o instanceof Short)
+ {
+ writeShort((Short)o);
+ }
+ else if (o instanceof Character)
+ {
+ writeChar((Character)o);
+ }
+ else if (o instanceof Integer)
+ {
+ writeInt((Integer)o);
+ }
+ else if(o instanceof Long)
+ {
+ writeLong((Long)o);
+ }
+ else if(o instanceof Float)
+ {
+ writeFloat((Float) o);
+ }
+ else if(o instanceof Double)
+ {
+ writeDouble((Double) o);
+ }
+ else if(o instanceof String)
+ {
+ writeUTF((String) o);
+ }
+ else if(o instanceof byte[])
+ {
+ writeBytes((byte[])o);
+ }
+ else
+ {
+ throw new MessageFormatException("Value passed to BytesMessage.writeObject() must be of primitive type. Type passed was " + o.getClass().getName());
+ }
+ }
+
+ public void reset() throws JMSException
+ {
+ if(_bytesOut != null)
+ {
+ byte[] data = _bytesOut.toByteArray();
+ _dataIn = new Data(new Binary(data));
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(data));
+ _dataAsOutput = null;
+ _bytesOut = null;
+ }
+ else
+ {
+
+ final Binary dataBuffer = _dataIn.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
+ }
+ }
+
+ private JMSException handleInputException(final IOException e)
+ {
+ JMSException ex;
+ if(e instanceof EOFException)
+ {
+ ex = new MessageEOFException(e.getMessage());
+ }
+ else
+ {
+ ex = new MessageFormatException(e.getMessage());
+ }
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ private JMSException handleOutputException(final IOException e)
+ {
+ JMSException ex = new JMSException(e.getMessage());
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ _dataAsInput = null;
+ _dataIn = null;
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ sections.add(getDataSection());
+ sections.add(getFooter());
+ return sections;
+ }
+
+}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Mon Oct 21 22:04:51 2013
@@ -33,6 +33,7 @@ import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
+
public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
{
private String _host;
@@ -45,7 +46,10 @@ public class ConnectionFactoryImpl imple
private String _queuePrefix;
private String _topicPrefix;
- private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));;
+ private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
+ private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
+ private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0);
+
public ConnectionFactoryImpl(final String host,
final int port,
@@ -82,6 +86,18 @@ public class ConnectionFactoryImpl imple
final String remoteHost,
final boolean ssl)
{
+ this(host, port, username, password, clientId, remoteHost, ssl,0);
+ }
+
+ public ConnectionFactoryImpl(final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId,
+ final String remoteHost,
+ final boolean ssl,
+ final int maxSessions)
+ {
_host = host;
_port = port;
_username = username;
@@ -89,6 +105,7 @@ public class ConnectionFactoryImpl imple
_clientId = clientId;
_remoteHost = remoteHost;
_ssl = ssl;
+ _maxSessions = maxSessions;
}
public ConnectionImpl createConnection() throws JMSException
@@ -98,10 +115,11 @@ public class ConnectionFactoryImpl imple
public ConnectionImpl createConnection(final String username, final String password) throws JMSException
{
- ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions);
connection.setQueuePrefix(_queuePrefix);
connection.setTopicPrefix(_topicPrefix);
connection.setUseBinaryMessageId(_useBinaryMessageId);
+ connection.setSyncPublish(_syncPublish);
return connection;
}
@@ -153,6 +171,8 @@ public class ConnectionFactoryImpl imple
String remoteHost = null;
boolean binaryMessageId = true;
+ boolean syncPublish = false;
+ int maxSessions = 0;
if(userInfo != null)
{
@@ -185,6 +205,14 @@ public class ConnectionFactoryImpl imple
{
binaryMessageId = Boolean.parseBoolean(keyValuePair[1]);
}
+ else if (keyValuePair[0].equalsIgnoreCase("sync-publish"))
+ {
+ syncPublish = Boolean.parseBoolean(keyValuePair[1]);
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("max-sessions"))
+ {
+ maxSessions = Integer.parseInt(keyValuePair[1]);
+ }
}
}
@@ -194,8 +222,9 @@ public class ConnectionFactoryImpl imple
}
ConnectionFactoryImpl connectionFactory =
- new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl);
+ new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions);
connectionFactory.setUseBinaryMessageId(binaryMessageId);
+ connectionFactory.setSyncPublish(syncPublish);
return connectionFactory;
@@ -253,4 +282,9 @@ public class ConnectionFactoryImpl imple
{
_useBinaryMessageId = useBinaryMessageId;
}
+
+ public void setSyncPublish(boolean syncPublish)
+ {
+ _syncPublish = syncPublish;
+ }
}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Mon Oct 21 22:04:51 2013
@@ -1,525 +1,574 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.apache.qpid.amqp_1_0.jms.Connection;
-import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
-import org.apache.qpid.amqp_1_0.jms.Session;
-import org.apache.qpid.amqp_1_0.transport.Container;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Queue;
-import java.util.*;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
-{
-
- private ConnectionMetaData _connectionMetaData;
- private volatile ExceptionListener _exceptionListener;
-
- private final List<SessionImpl> _sessions = new ArrayList<SessionImpl>();
-
- private final Object _lock = new Object();
-
- private org.apache.qpid.amqp_1_0.client.Connection _conn;
- private boolean _isQueueConnection;
- private boolean _isTopicConnection;
- private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
- private String _host;
- private int _port;
- private final String _username;
- private final String _password;
- private String _remoteHost;
- private final boolean _ssl;
- private String _clientId;
- private String _queuePrefix;
- private String _topicPrefix;
- private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
-
- private static enum State
- {
- UNCONNECTED,
- STOPPED,
- STARTED,
- CLOSED
- }
-
- private volatile State _state = State.UNCONNECTED;
-
- public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
- {
- this(host,port,username,password,clientId,false);
- }
-
- public ConnectionImpl(String host, int port, String username, String password, String clientId, boolean ssl) throws JMSException
- {
- this(host,port,username,password,clientId,null,ssl);
- }
-
- public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
- {
- _host = host;
- _port = port;
- _username = username;
- _password = password;
- _clientId = clientId;
- _remoteHost = remoteHost;
- _ssl = ssl;
- }
-
- private void connect() throws JMSException
- {
- synchronized(_lock)
- {
- // already connected?
- if( _state == State.UNCONNECTED )
- {
- _state = State.STOPPED;
-
- Container container = _clientId == null ? new Container() : new Container(_clientId);
- // TODO - authentication, containerId, clientId, ssl?, etc
- try
- {
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
- _port, _username, _password, container, _remoteHost, _ssl);
- // TODO - retrieve negotiated AMQP version
- _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
- }
- catch (ConnectionException e)
- {
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.setLinkedException(e);
- jmsEx.initCause(e);
- throw jmsEx;
- }
- }
- }
- }
-
- private void checkNotConnected(String msg) throws IllegalStateException
- {
- synchronized(_lock)
- {
- if( _state != State.UNCONNECTED )
- {
- throw new IllegalStateException(msg);
- }
- }
- }
-
- public SessionImpl createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
- {
- Session.AcknowledgeMode ackMode;
-
- try
- {
- ackMode = transacted ? Session.AcknowledgeMode.SESSION_TRANSACTED
- : Session.AcknowledgeMode.values()[acknowledgeMode];
- }
- catch (IndexOutOfBoundsException e)
- {
- JMSException jmsEx = new JMSException("Unknown acknowledgement mode " + acknowledgeMode);
- jmsEx.setLinkedException(e);
- jmsEx.initCause(e);
- throw jmsEx;
- }
-
- return createSession(ackMode);
- }
-
- public SessionImpl createSession(final Session.AcknowledgeMode acknowledgeMode) throws JMSException
- {
- boolean started = false;
- synchronized(_lock)
- {
- if(_state == State.CLOSED)
- {
- throw new IllegalStateException("Cannot create a session on a closed connection");
- }
- else if(_state == State.UNCONNECTED)
- {
- connect();
- started = true;
- }
-
- try
- {
- SessionImpl session = new SessionImpl(this, acknowledgeMode);
- session.setQueueSession(_isQueueConnection);
- session.setTopicSession(_isTopicConnection);
- _sessions.add(session);
-
- if(_state == State.STARTED)
- {
- session.start();
- }
-
- return session;
- }
- catch(JMSException e)
- {
- Error remoteError;
- if(started
- && e.getLinkedException() instanceof ConnectionErrorException
- && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT)
- {
- String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host"));
- int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port"));
- String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname"));
- reconnect(networkHost,port,hostName);
- return createSession(acknowledgeMode);
-
- }
- else
- {
- throw e;
- }
- }
- }
-
-
- }
-
- void removeSession(SessionImpl session)
- {
- synchronized (_lock)
- {
- _sessions.remove(session);
- }
- }
-
- private void reconnect(String networkHost, int port, String hostName)
- {
- synchronized(_lock)
- {
- _state = State.UNCONNECTED;
- _host = networkHost;
- _port = port;
- _remoteHost = hostName;
- _conn = null;
- }
- }
-
- public String getClientID() throws JMSException
- {
- checkClosed();
- return _clientId;
- }
-
- public void setClientID(final String value) throws JMSException
- {
- checkNotConnected("Cannot set client-id to \""
- + value
- + "\"; client-id must be set before the connection is used");
- if( _clientId !=null )
- {
- throw new IllegalStateException("client-id has already been set");
- }
- _clientId = value;
- }
-
- public ConnectionMetaData getMetaData() throws JMSException
- {
- checkClosed();
- return _connectionMetaData;
- }
-
- public ExceptionListener getExceptionListener() throws JMSException
- {
- checkClosed();
- return _exceptionListener;
- }
-
- public void setExceptionListener(final ExceptionListener exceptionListener) throws JMSException
- {
- checkClosed();
- _exceptionListener = exceptionListener;
- }
-
- public void start() throws JMSException
- {
- synchronized(_lock)
- {
- checkClosed();
- connect();
- if(_state == State.STOPPED)
- {
- // TODO
-
- _state = State.STARTED;
-
- for(SessionImpl session : _sessions)
- {
- session.start();
- }
-
- }
-
- _lock.notifyAll();
- }
-
- }
-
- public void stop() throws JMSException
- {
- synchronized(_lock)
- {
- switch(_state)
- {
- case STARTED:
- for(SessionImpl session : _sessions)
- {
- session.stop();
- }
- case UNCONNECTED:
- _state = State.STOPPED;
- break;
- case CLOSED:
- throw new javax.jms.IllegalStateException("Closed");
- }
-
- _lock.notifyAll();
- }
- }
-
-
- static interface CloseTask
- {
- public void onClose() throws JMSException;
- }
-
- void addOnCloseTask(CloseTask task)
- {
- synchronized (_lock)
- {
- _closeTasks.add(task);
- }
- }
-
-
- void removeOnCloseTask(CloseTask task)
- {
- synchronized (_lock)
- {
- _closeTasks.remove(task);
- }
- }
-
- public void close() throws JMSException
- {
- synchronized(_lock)
- {
- if(_state != State.CLOSED)
- {
- stop();
- List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions);
- for(SessionImpl session : sessions)
- {
- session.close();
- }
- for(CloseTask task : _closeTasks)
- {
- task.onClose();
- }
- if(_conn != null && _state != State.UNCONNECTED ) {
- _conn.close();
- }
- _state = State.CLOSED;
- }
-
- _lock.notifyAll();
- }
- }
-
- private void checkClosed() throws IllegalStateException
- {
- if(_state == State.CLOSED)
- throw new IllegalStateException("Closed");
- }
-
- public ConnectionConsumer createConnectionConsumer(final Destination destination,
- final String s,
- final ServerSessionPool serverSessionPool,
- final int i) throws JMSException
- {
- checkClosed();
- return null; //TODO
- }
-
- public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
- {
- checkClosed();
- SessionImpl session = createSession(transacted, acknowledgeMode);
- session.setTopicSession(true);
- return session;
- }
-
- public ConnectionConsumer createConnectionConsumer(final Topic topic,
- final String s,
- final ServerSessionPool serverSessionPool,
- final int i) throws JMSException
- {
- checkClosed();
- return null; //TODO
- }
-
- public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
- final String s,
- final String s1,
- final ServerSessionPool serverSessionPool,
- final int i) throws JMSException
- {
- checkClosed();
- if (_isQueueConnection)
- {
- throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
- }
- return null; //TODO
- }
-
- public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
- {
- checkClosed();
- SessionImpl session = createSession(transacted, acknowledgeMode);
- session.setQueueSession(true);
- return session;
- }
-
- public ConnectionConsumer createConnectionConsumer(final Queue queue,
- final String s,
- final ServerSessionPool serverSessionPool,
- final int i) throws JMSException
- {
- checkClosed();
- return null; //TODO
- }
-
-
-
- protected org.apache.qpid.amqp_1_0.client.Connection getClientConnection()
- {
- return _conn;
- }
-
- public boolean isStarted()
- {
- return _state == State.STARTED;
- }
-
- void setQueueConnection(final boolean queueConnection)
- {
- _isQueueConnection = queueConnection;
- }
-
- void setTopicConnection(final boolean topicConnection)
- {
- _isTopicConnection = topicConnection;
- }
-
- public String getTopicPrefix()
- {
- return _topicPrefix;
- }
-
- public void setTopicPrefix(String topicPrefix)
- {
- _topicPrefix = topicPrefix;
- }
-
- public String getQueuePrefix()
- {
- return _queuePrefix;
- }
-
- public void setQueuePrefix(String queueprefix)
- {
- _queuePrefix = queueprefix;
- }
-
- DecodedDestination toDecodedDestination(DestinationImpl dest)
- {
- String address = dest.getAddress();
- Set<String> kind = null;
- Class clazz = dest.getClass();
- if( clazz==QueueImpl.class )
- {
- kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
- if( _queuePrefix!=null )
- {
- // Avoid double prefixing..
- if( !address.startsWith(_queuePrefix) )
- {
- address = _queuePrefix+address;
- }
- }
- }
- else if( clazz==TopicImpl.class )
- {
- kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
- if( _topicPrefix!=null )
- {
- // Avoid double prefixing..
- if( !address.startsWith(_topicPrefix) )
- {
- address = _topicPrefix+address;
- }
- }
- }
- else if( clazz==TemporaryQueueImpl.class )
- {
- kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
- }
- else if( clazz==TemporaryTopicImpl.class )
- {
- kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
- }
- return new DecodedDestination(address, kind);
- }
-
- DecodedDestination toDecodedDestination(String address, Set<String> kind)
- {
- if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
- {
- return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
- }
- if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
- {
- return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
- }
- return new DecodedDestination(address, kind);
- }
-
- void setUseBinaryMessageId(boolean useBinaryMessageId)
- {
- _useBinaryMessageId = useBinaryMessageId;
- }
-
- boolean useBinaryMessageId()
- {
- return _useBinaryMessageId;
- }
-
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.apache.qpid.amqp_1_0.jms.Connection;
+import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.transport.Container;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.jms.Queue;
+
+import java.util.*;
+
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
+{
+
+ private ConnectionMetaData _connectionMetaData;
+ private volatile ExceptionListener _exceptionListener;
+
+ private final List<SessionImpl> _sessions = new ArrayList<SessionImpl>();
+
+ private final Object _lock = new Object();
+
+ private org.apache.qpid.amqp_1_0.client.Connection _conn;
+ private boolean _isQueueConnection;
+ private boolean _isTopicConnection;
+ private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
+ private String _host;
+ private int _port;
+ private final String _username;
+ private final String _password;
+ private String _remoteHost;
+ private final boolean _ssl;
+ private String _clientId;
+ private String _queuePrefix;
+ private String _topicPrefix;
+ private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
+ private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
+ private int _maxSessions;
+
+ private static enum State
+ {
+ UNCONNECTED,
+ STOPPED,
+ STARTED,
+ CLOSED
+ }
+
+ private volatile State _state = State.UNCONNECTED;
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
+ {
+ this(host,port,username,password,clientId,false);
+ }
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId, boolean ssl) throws JMSException
+ {
+ this(host,port,username,password,clientId,null,ssl);
+ }
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
+ {
+ this(host, port, username, password, clientId, remoteHost, ssl,0);
+ }
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
+ {
+ _host = host;
+ _port = port;
+ _username = username;
+ _password = password;
+ _clientId = clientId;
+ _remoteHost = remoteHost;
+ _ssl = ssl;
+ _maxSessions = maxSessions;
+ }
+
+ private void connect() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ // already connected?
+ if( _state == State.UNCONNECTED )
+ {
+ _state = State.STOPPED;
+
+ Container container = _clientId == null ? new Container() : new Container(_clientId);
+ // TODO - authentication, containerId, clientId, ssl?, etc
+ try
+ {
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
+ _port, _username, _password, container, _remoteHost, _ssl,
+ _maxSessions - 1);
+ _conn.setConnectionErrorTask(new ConnectionErrorTask());
+ // TODO - retrieve negotiated AMQP version
+ _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+ }
+ catch (ConnectionException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+ }
+ }
+ }
+
+ private void checkNotConnected(String msg) throws IllegalStateException
+ {
+ synchronized(_lock)
+ {
+ if( _state != State.UNCONNECTED )
+ {
+ throw new IllegalStateException(msg);
+ }
+ }
+ }
+
+ public SessionImpl createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ Session.AcknowledgeMode ackMode;
+
+ try
+ {
+ ackMode = transacted ? Session.AcknowledgeMode.SESSION_TRANSACTED
+ : Session.AcknowledgeMode.values()[acknowledgeMode];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ JMSException jmsEx = new JMSException("Unknown acknowledgement mode " + acknowledgeMode);
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+
+ return createSession(ackMode);
+ }
+
+ public SessionImpl createSession(final Session.AcknowledgeMode acknowledgeMode) throws JMSException
+ {
+ boolean started = false;
+ synchronized(_lock)
+ {
+ if(_state == State.CLOSED)
+ {
+ throw new IllegalStateException("Cannot create a session on a closed connection");
+ }
+ else if(_state == State.UNCONNECTED)
+ {
+ connect();
+ started = true;
+ }
+
+ try
+ {
+ SessionImpl session = new SessionImpl(this, acknowledgeMode);
+ session.setQueueSession(_isQueueConnection);
+ session.setTopicSession(_isTopicConnection);
+ _sessions.add(session);
+
+ if(_state == State.STARTED)
+ {
+ session.start();
+ }
+
+ return session;
+ }
+ catch(JMSException e)
+ {
+ Error remoteError;
+ if(started
+ && e.getLinkedException() instanceof ConnectionErrorException
+ && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT)
+ {
+ String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host"));
+ int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port"));
+ String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname"));
+ reconnect(networkHost,port,hostName);
+ return createSession(acknowledgeMode);
+
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+
+ }
+
+ void removeSession(SessionImpl session)
+ {
+ synchronized (_lock)
+ {
+ _sessions.remove(session);
+ }
+ }
+
+ private void reconnect(String networkHost, int port, String hostName)
+ {
+ synchronized(_lock)
+ {
+ _state = State.UNCONNECTED;
+ _host = networkHost;
+ _port = port;
+ _remoteHost = hostName;
+ _conn = null;
+ }
+ }
+
+ public String getClientID() throws JMSException
+ {
+ checkClosed();
+ return _clientId;
+ }
+
+ public void setClientID(final String value) throws JMSException
+ {
+ checkNotConnected("Cannot set client-id to \""
+ + value
+ + "\"; client-id must be set before the connection is used");
+ if( _clientId !=null )
+ {
+ throw new IllegalStateException("client-id has already been set");
+ }
+ _clientId = value;
+ }
+
+ public ConnectionMetaData getMetaData() throws JMSException
+ {
+ checkClosed();
+ return _connectionMetaData;
+ }
+
+ public ExceptionListener getExceptionListener() throws JMSException
+ {
+ checkClosed();
+ return _exceptionListener;
+ }
+
+ public void setExceptionListener(final ExceptionListener exceptionListener) throws JMSException
+ {
+ checkClosed();
+ _exceptionListener = exceptionListener;
+ }
+
+ public void start() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ checkClosed();
+ connect();
+ if(_state == State.STOPPED)
+ {
+ // TODO
+
+ _state = State.STARTED;
+
+ for(SessionImpl session : _sessions)
+ {
+ session.start();
+ }
+
+ }
+
+ _lock.notifyAll();
+ }
+
+ }
+
+ public void stop() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ switch(_state)
+ {
+ case STARTED:
+ for(SessionImpl session : _sessions)
+ {
+ session.stop();
+ }
+ case UNCONNECTED:
+ _state = State.STOPPED;
+ break;
+ case CLOSED:
+ throw new javax.jms.IllegalStateException("Closed");
+ }
+
+ _lock.notifyAll();
+ }
+ }
+
+
+ static interface CloseTask
+ {
+ public void onClose() throws JMSException;
+ }
+
+ void addOnCloseTask(CloseTask task)
+ {
+ synchronized (_lock)
+ {
+ _closeTasks.add(task);
+ }
+ }
+
+
+ void removeOnCloseTask(CloseTask task)
+ {
+ synchronized (_lock)
+ {
+ _closeTasks.remove(task);
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ synchronized(_lock)
+ {
+ if(_state != State.CLOSED)
+ {
+ stop();
+ List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions);
+ for(SessionImpl session : sessions)
+ {
+ session.close();
+ }
+ for(CloseTask task : _closeTasks)
+ {
+ task.onClose();
+ }
+ if(_conn != null && _state != State.UNCONNECTED ) {
+ _conn.close();
+ }
+ _state = State.CLOSED;
+ }
+
+ _lock.notifyAll();
+ }
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_state == State.CLOSED)
+ throw new IllegalStateException("Closed");
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Destination destination,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+ public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ checkClosed();
+ SessionImpl session = createSession(transacted, acknowledgeMode);
+ session.setTopicSession(true);
+ return session;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Topic topic,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+ public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
+ final String s,
+ final String s1,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ if (_isQueueConnection)
+ {
+ throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
+ }
+ return null; //TODO
+ }
+
+ public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ checkClosed();
+ SessionImpl session = createSession(transacted, acknowledgeMode);
+ session.setQueueSession(true);
+ return session;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Queue queue,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+
+
+ protected org.apache.qpid.amqp_1_0.client.Connection getClientConnection()
+ {
+ return _conn;
+ }
+
+ public boolean isStarted()
+ {
+ return _state == State.STARTED;
+ }
+
+ void setQueueConnection(final boolean queueConnection)
+ {
+ _isQueueConnection = queueConnection;
+ }
+
+ void setTopicConnection(final boolean topicConnection)
+ {
+ _isTopicConnection = topicConnection;
+ }
+
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queueprefix)
+ {
+ _queuePrefix = queueprefix;
+ }
+
+ DecodedDestination toDecodedDestination(DestinationImpl dest)
+ {
+ String address = dest.getAddress();
+ Set<String> kind = null;
+ Class clazz = dest.getClass();
+ if( clazz==QueueImpl.class )
+ {
+ kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+ if( _queuePrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_queuePrefix) )
+ {
+ address = _queuePrefix+address;
+ }
+ }
+ }
+ else if( clazz==TopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+ if( _topicPrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_topicPrefix) )
+ {
+ address = _topicPrefix+address;
+ }
+ }
+ }
+ else if( clazz==TemporaryQueueImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+ }
+ else if( clazz==TemporaryTopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ DecodedDestination toDecodedDestination(String address, Set<String> kind)
+ {
+ if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+ {
+ return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+ }
+ if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+ {
+ return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ void setUseBinaryMessageId(boolean useBinaryMessageId)
+ {
+ _useBinaryMessageId = useBinaryMessageId;
+ }
+
+ boolean useBinaryMessageId()
+ {
+ return _useBinaryMessageId;
+ }
+
+ void setSyncPublish(boolean syncPublish)
+ {
+ _syncPublish = syncPublish;
+ }
+
+ boolean syncPublish()
+ {
+ return _syncPublish;
+ }
+
+ private class ConnectionErrorTask implements Runnable
+ {
+
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ final ExceptionListener exceptionListener = getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final org.apache.qpid.amqp_1_0.type.transport.Error connectionError = _conn.getConnectionError();
+ if(connectionError != null)
+ {
+ exceptionListener.onException(new JMSException(connectionError.getDescription(),
+ connectionError.getCondition().toString()));
+ }
+ }
+ }
+ catch (JMSException ignored)
+ {
+ // ignored
+ }
+ }
+ }
+
+}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java Mon Oct 21 22:04:51 2013
@@ -1,105 +1,105 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
-
-import javax.jms.JMSException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-
-public class ConnectionMetaDataImpl implements ConnectionMetaData
-{
- private static final int JMS_MAJOR_VERSION = 1;
- private static final int JMS_MINOR_VERSION = 1;
-
- private static final int PROVIDER_MAJOR_VERSION = 1;
- private static final int PROVIDER_MINOR_VERSION = 0;
-
-
- private final int _amqpMajorVersion;
- private final int _amqpMinorVersion;
- private final int _amqpRevisionVersion;
- private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
-
- public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
- {
- _amqpMajorVersion = amqpMajorVersion;
- _amqpMinorVersion = amqpMinorVersion;
- _amqpRevisionVersion = amqpRevisionVersion;
- }
-
- public String getJMSVersion() throws JMSException
- {
- return getJMSMajorVersion() + "." + getJMSMinorVersion();
- }
-
- public int getJMSMajorVersion() throws JMSException
- {
- return JMS_MAJOR_VERSION;
- }
-
- public int getJMSMinorVersion() throws JMSException
- {
- return JMS_MINOR_VERSION;
- }
-
- public String getJMSProviderName() throws JMSException
- {
- return "AMQP.ORG";
- }
-
- public String getProviderVersion() throws JMSException
- {
- return getProviderMajorVersion() + "." + getProviderMinorVersion();
- }
-
- public int getProviderMajorVersion() throws JMSException
- {
- return PROVIDER_MAJOR_VERSION;
- }
-
- public int getProviderMinorVersion() throws JMSException
- {
- return PROVIDER_MINOR_VERSION;
- }
-
- public Enumeration getJMSXPropertyNames() throws JMSException
- {
-
- return Collections.enumeration(_jmsxProperties);
- }
-
- public int getAMQPMajorVersion()
- {
- return _amqpMajorVersion;
- }
-
- public int getAMQPMinorVersion()
- {
- return _amqpMinorVersion;
- }
-
- public int getAMQPRevisionVersion()
- {
- return _amqpRevisionVersion;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
+
+import javax.jms.JMSException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+
+public class ConnectionMetaDataImpl implements ConnectionMetaData
+{
+ private static final int JMS_MAJOR_VERSION = 1;
+ private static final int JMS_MINOR_VERSION = 1;
+
+ private static final int PROVIDER_MAJOR_VERSION = 1;
+ private static final int PROVIDER_MINOR_VERSION = 0;
+
+
+ private final int _amqpMajorVersion;
+ private final int _amqpMinorVersion;
+ private final int _amqpRevisionVersion;
+ private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
+
+ public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
+ {
+ _amqpMajorVersion = amqpMajorVersion;
+ _amqpMinorVersion = amqpMinorVersion;
+ _amqpRevisionVersion = amqpRevisionVersion;
+ }
+
+ public String getJMSVersion() throws JMSException
+ {
+ return getJMSMajorVersion() + "." + getJMSMinorVersion();
+ }
+
+ public int getJMSMajorVersion() throws JMSException
+ {
+ return JMS_MAJOR_VERSION;
+ }
+
+ public int getJMSMinorVersion() throws JMSException
+ {
+ return JMS_MINOR_VERSION;
+ }
+
+ public String getJMSProviderName() throws JMSException
+ {
+ return "AMQP.ORG";
+ }
+
+ public String getProviderVersion() throws JMSException
+ {
+ return getProviderMajorVersion() + "." + getProviderMinorVersion();
+ }
+
+ public int getProviderMajorVersion() throws JMSException
+ {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ public int getProviderMinorVersion() throws JMSException
+ {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException
+ {
+
+ return Collections.enumeration(_jmsxProperties);
+ }
+
+ public int getAMQPMajorVersion()
+ {
+ return _amqpMajorVersion;
+ }
+
+ public int getAMQPMinorVersion()
+ {
+ return _amqpMinorVersion;
+ }
+
+ public int getAMQPRevisionVersion()
+ {
+ return _amqpRevisionVersion;
+ }
+}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java (original)
+++ qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java Mon Oct 21 22:04:51 2013
@@ -1,85 +1,85 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.Destination;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.Topic;
-
-import javax.jms.JMSException;
-import java.util.WeakHashMap;
-
-public class DestinationImpl implements Destination, Queue, Topic
-{
- private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
- new WeakHashMap<String, DestinationImpl>();
-
- private final String _address;
-
- protected DestinationImpl(String address)
- {
- _address = address;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public static DestinationImpl valueOf(String address)
- {
- return address == null ? null : createDestination(address);
- }
-
- @Override
- public int hashCode()
- {
- return _address.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj)
- {
- return obj != null
- && obj.getClass() == getClass()
- && _address.equals(((DestinationImpl)obj)._address);
- }
-
- public static synchronized DestinationImpl createDestination(final String address)
- {
- DestinationImpl destination = DESTINATION_CACHE.get(address);
- if(destination == null)
- {
- destination = new DestinationImpl(address);
- DESTINATION_CACHE.put(address, destination);
- }
- return destination;
- }
-
- public String getQueueName() throws JMSException
- {
- return getAddress();
- }
-
- public String getTopicName() throws JMSException
- {
- return getAddress();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Destination;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+
+import javax.jms.JMSException;
+import java.util.WeakHashMap;
+
+public class DestinationImpl implements Destination, Queue, Topic
+{
+ private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
+ new WeakHashMap<String, DestinationImpl>();
+
+ private final String _address;
+
+ protected DestinationImpl(String address)
+ {
+ _address = address;
+ }
+
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ public static DestinationImpl valueOf(String address)
+ {
+ return address == null ? null : createDestination(address);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _address.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj)
+ {
+ return obj != null
+ && obj.getClass() == getClass()
+ && _address.equals(((DestinationImpl)obj)._address);
+ }
+
+ public static synchronized DestinationImpl createDestination(final String address)
+ {
+ DestinationImpl destination = DESTINATION_CACHE.get(address);
+ if(destination == null)
+ {
+ destination = new DestinationImpl(address);
+ DESTINATION_CACHE.put(address, destination);
+ }
+ return destination;
+ }
+
+ public String getQueueName() throws JMSException
+ {
+ return getAddress();
+ }
+
+ public String getTopicName() throws JMSException
+ {
+ return getAddress();
+ }
+}
Propchange: qpid/branches/linearstore/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org