You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC
svn commit: r577253 [5/7] - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
nclient/ nclient/impl/ nclient/util/ njms/ njms/message/
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.QueueSession;
+import javax.jms.JMSException;
+import javax.jms.XAQueueSession;
+
+/**
+ * Imeplements javax.njms.XAQueueSessionImpl
+ */
+public class XAQueueSessionImpl extends XASessionImpl implements XAQueueSession
+{
+ /**
+ * The standard session
+ */
+ private QueueSession _jmsQueueSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XAQueueSessionImpl
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @throws org.apache.qpidity.QpidException
+ * In case of internal error.
+ */
+ protected XAQueueSessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection);
+ }
+
+ //--- interface XAQueueSession
+ /**
+ * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+ *
+ * @return the topic session object
+ * @throws JMSException If an internal error occurs.
+ */
+ public QueueSession getQueueSession() throws JMSException
+ {
+ if (_jmsQueueSession == null)
+ {
+ _jmsQueueSession = getConnection().createQueueSession(true, getAcknowledgeMode());
+ }
+ return _jmsQueueSession;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,507 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.dtx.XidImpl;
+import org.apache.qpidity.transport.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an implementation of javax.njms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+ /**
+ * this XAResourceImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
+
+ /**
+ * Reference to the associated XASession
+ */
+ private XASessionImpl _xaSession = null;
+
+ /**
+ * The XID of this resource
+ */
+ private Xid _xid;
+
+ //--- constructor
+
+ /**
+ * Create an XAResource associated with a XASession
+ *
+ * @param xaSession The session XAresource
+ */
+ protected XAResourceImpl(XASessionImpl xaSession)
+ {
+ _xaSession = xaSession;
+ }
+
+ //--- The XAResource
+ /**
+ * Commits the global transaction specified by xid.
+ *
+ * @param xid A global transaction identifier
+ * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
+ * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void commit(Xid xid, boolean b) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxCoordinationCommitResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxCoordinationCommit(XidImpl.convertToString(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxCoordinationCommitResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_HEURHAZ:
+ throw new XAException(XAException.XA_HEURHAZ);
+ case Constant.XA_HEURCOM:
+ throw new XAException(XAException.XA_HEURCOM);
+ case Constant.XA_HEURRB:
+ throw new XAException(XAException.XA_HEURRB);
+ case Constant.XA_HEURMIX:
+ throw new XAException(XAException.XA_HEURMIX);
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+
+ /**
+ * Ends the work performed on behalf of a transaction branch.
+ * The resource manager disassociates the XA resource from the transaction branch specified
+ * and lets the transaction complete.
+ * <ul>
+ * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
+ * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
+ * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
+ * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
+ * /ul>
+ *
+ * @param xid A global transaction identifier that is the same as the identifier used previously in the start method
+ * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+ * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
+ * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
+ */
+ public void end(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("end ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxDemarcationEndResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxDemarcationEnd(XidImpl.convertToString(xid),
+ flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
+ flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxDemarcationEndResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+
+ /**
+ * Tells the resource manager to forget about a heuristically completed transaction branch.
+ *
+ * @param xid String(xid.getGlobalTransactionId() A global transaction identifier
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
+ * XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void forget(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("forget ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ _xaSession.getQpidSession().dtxCoordinationForget(new String(xid.getGlobalTransactionId()));
+ }
+
+ /**
+ * Obtains the current transaction timeout value set for this XAResource instance.
+ * If XAResource.setTransactionTimeout was not used prior to invoking this method,
+ * the return value is the default timeout i.e. 0;
+ * otherwise, the value used in the previous setTransactionTimeout call is returned.
+ *
+ * @return The transaction timeout value in seconds.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public int getTransactionTimeout() throws XAException
+ {
+ int result = 0;
+ if (_xid != null)
+ {
+ try
+ {
+ Future<DtxCoordinationGetTimeoutResult> future =
+ _xaSession.getQpidSession().dtxCoordinationGetTimeout(XidImpl.convertToString(_xid));
+ result = (int) future.get().getTimeout();
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This method is called to determine if the resource manager instance represented
+ * by the target object is the same as the resouce manager instance represented by
+ * the parameter xaResource.
+ *
+ * @param xaResource An XAResource object whose resource manager instance is to
+ * be compared with the resource manager instance of the target object
+ * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
+ */
+ public boolean isSameRM(XAResource xaResource) throws XAException
+ {
+ // TODO : get the server identity of xaResource and compare it with our own one
+ return false;
+ }
+
+ /**
+ * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
+ *
+ * @param xid A global transaction identifier.
+ * @return A value indicating the resource manager's vote on the outcome of the transaction.
+ * The possible values are: XA_RDONLY or XA_OK.
+ * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
+ */
+ public int prepare(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("prepare ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ Future<DtxCoordinationPrepareResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(XidImpl.convertToString(xid));
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ DtxCoordinationPrepareResult result = future.get();
+ int status = result.getStatus();
+ int outcome;
+ switch (status)
+ {
+ case Constant.XA_OK:
+ outcome = XAResource.XA_OK;
+ break;
+ case Constant.XA_RDONLY:
+ outcome = XAResource.XA_RDONLY;
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ return outcome;
+ }
+
+ /**
+ * Obtains a list of prepared transaction branches.
+ * <p/>
+ * The transaction manager calls this method during recovery to obtain the list of transaction branches
+ * that are currently in prepared or heuristically completed states.
+ *
+ * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+ * TMNOFLAGS must be used when no other flags are set in the parameter.
+ * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
+ * completed state.
+ * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+ */
+ public Xid[] recover(int flag) throws XAException
+ {
+ // the flag is ignored
+ Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover();
+ DtxCoordinationRecoverResult res = future.get();
+ // todo make sure that the keys of the returned map are the xids
+ Xid[] result = new Xid[res.getInDoubt().size()];
+ int i = 0;
+ try
+ {
+ for (String xid : res.getInDoubt().keySet())
+ {
+ result[i] = new XidImpl(xid);
+ i++;
+ }
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert string into Xid ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ return result;
+ }
+
+ /**
+ * Informs the resource manager to roll back work done on behalf of a transaction branch
+ *
+ * @param xid A global transaction identifier.
+ * @throws XAException An error has occurred.
+ */
+ public void rollback(Xid xid) throws XAException
+ {
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // the flag is ignored
+ Future<DtxCoordinationRollbackResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxCoordinationRollback(XidImpl.convertToString(xid));
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxCoordinationRollbackResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_HEURHAZ:
+ throw new XAException(XAException.XA_HEURHAZ);
+ case Constant.XA_HEURCOM:
+ throw new XAException(XAException.XA_HEURCOM);
+ case Constant.XA_HEURRB:
+ throw new XAException(XAException.XA_HEURRB);
+ case Constant.XA_HEURMIX:
+ throw new XAException(XAException.XA_HEURMIX);
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+
+ /**
+ * Sets the current transaction timeout value for this XAResource instance.
+ * Once set, this timeout value is effective until setTransactionTimeout is
+ * invoked again with a different value.
+ * To reset the timeout value to the default value used by the resource manager, set the value to zero.
+ *
+ * @param timeout The transaction timeout value in seconds.
+ * @return true if transaction timeout value is set successfully; otherwise false.
+ * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
+ */
+ public boolean setTransactionTimeout(int timeout) throws XAException
+ {
+ boolean result = false;
+ if (_xid != null)
+ {
+ try
+ {
+ _xaSession.getQpidSession()
+ .dtxCoordinationSetTimeout(XidImpl.convertToString(_xid), timeout);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ result = true;
+ }
+ return result;
+ }
+
+ /**
+ * Starts work on behalf of a transaction branch specified in xid.
+ * <ul>
+ * <li> If TMJOIN is specified, an exception is thrown as it is not supported
+ * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
+ * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
+ * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
+ * </ul>
+ *
+ * @param xid A global transaction identifier to be associated with the resource
+ * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * @throws XAException An error has occurred. Possible exceptions
+ * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void start(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("start ", xid);
+ }
+ if (xid == null)
+ {
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ _xid = xid;
+ Future<DtxDemarcationStartResult> future;
+ try
+ {
+ future = _xaSession.getQpidSession()
+ .dtxDemarcationStart(XidImpl.convertToString(xid),
+ flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Cannot convert Xid into String format ", e);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ // now wait on the future for the result
+ DtxDemarcationStartResult result = future.get();
+ int status = result.getStatus();
+ switch (status)
+ {
+ case Constant.XA_OK:
+ // do nothing this ok
+ break;
+ case Constant.XA_RBROLLBACK:
+ throw new XAException(XAException.XA_RBROLLBACK);
+ case Constant.XA_RBTIMEOUT:
+ throw new XAException(XAException.XA_RBTIMEOUT);
+ default:
+ // this should not happen
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("got unexpected status value: ", status);
+ }
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,126 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.nclient.DtxSession;
+
+import javax.jms.XASession;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAResource;
+
+/**
+ * This is an implementation of the javax.njms.XASEssion interface.
+ */
+public class XASessionImpl extends SessionImpl implements XASession
+{
+ /**
+ * XAResource associated with this XASession
+ */
+ private final XAResourceImpl _xaResource;
+
+ /**
+ * This XASession Qpid DtxSession
+ */
+ private DtxSession _qpidDtxSession;
+
+ /**
+ * The standard session
+ */
+ private Session _jmsSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XASession
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @throws QpidException In case of internal error.
+ */
+ protected XASessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection, true, // this is a transacted session
+ Session.SESSION_TRANSACTED, // the ack mode is transacted
+ true); // this is an XA session so do not set tx
+ _qpidDtxSession = getConnection().getQpidConnection().createDTXSession(0);
+ _xaResource = new XAResourceImpl(this);
+ }
+
+ //--- javax.njms.XASEssion API
+
+ /**
+ * Gets the session associated with this XASession.
+ *
+ * @return The session object.
+ * @throws JMSException if an internal error occurs.
+ */
+ public Session getSession() throws JMSException
+ {
+ if( _jmsSession == null )
+ {
+ _jmsSession = getConnection().createSession(true, getAcknowledgeMode());
+ }
+ return _jmsSession;
+ }
+
+ /**
+ * Returns an XA resource.
+ *
+ * @return An XA resource.
+ */
+ public XAResource getXAResource()
+ {
+ return _xaResource;
+ }
+
+ //-- overwritten mehtods
+ /**
+ * Throws a {@link TransactionInProgressException}, since it should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void commit() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the commit operation is probibited!");
+ }
+
+ /**
+ * Throws a {@link TransactionInProgressException}, since it should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void rollback() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the rollback operation is probibited!");
+ }
+
+ /**
+ * Access to the underlying Qpid Session
+ *
+ * @return The associated Qpid Session.
+ */
+ protected org.apache.qpidity.nclient.DtxSession getQpidSession()
+ {
+ return _qpidDtxSession;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,71 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XATopicConnection;
+import javax.jms.JMSException;
+import javax.jms.XATopicSession;
+
+/**
+ * implements javax.njms.XATopicConnection
+ */
+public class XATopicConnectionImpl extends XAConnectionImpl implements XATopicConnection
+{
+ //-- constructor
+ /**
+ * Create a XATopicConnection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param username The user name used of user identification.
+ * @param password The password name used of user identification.
+ * @throws QpidException If creating a XATopicConnection fails due to some internal error.
+ */
+ public XATopicConnectionImpl(String host, int port, String virtualHost, String username, String password)
+ throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+
+ /**
+ * Creates an XATopicSession.
+ *
+ * @return A newly created XATopicSession.
+ * @throws JMSException If the XAConnectiono fails to create an XATopicSession due to
+ * some internal error.
+ */
+ public synchronized XATopicSession createXATopicSession() throws JMSException
+ {
+ checkNotClosed();
+ XATopicSessionImpl xaTopicSession;
+ try
+ {
+ xaTopicSession = new XATopicSessionImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session with the list of session that are handled by this connection
+ _sessions.add(xaTopicSession);
+ return xaTopicSession;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,63 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.*;
+
+/**
+ * Implements javax.njms.XATopicSession and javax.njms.TopicSession
+ */
+public class XATopicSessionImpl extends XASessionImpl implements XATopicSession
+{
+ /**
+ * The standard session
+ */
+ private TopicSession _jmsTopicSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XASession
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @throws org.apache.qpidity.QpidException
+ * In case of internal error.
+ */
+ protected XATopicSessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection);
+ }
+
+ //--- interface XATopicSession
+
+ /**
+ * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+ *
+ * @return the topic session object
+ * @throws JMSException If an internal error occurs.
+ */
+ public TopicSession getTopicSession() throws JMSException
+ {
+ if (_jmsTopicSession == null)
+ {
+ _jmsTopicSession = getConnection().createTopicSession(true, getAcknowledgeMode());
+ }
+ return _jmsTopicSession;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,863 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms.message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.*;
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * Implements javax.njms.BytesMessage
+ */
+public class BytesMessageImpl extends MessageImpl implements BytesMessage
+{
+ /**
+ * this BytesMessageImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(BytesMessageImpl.class);
+
+ /**
+ * An input stream for reading this message data
+ * This stream wrappes the received byteBuffer.
+ */
+ protected DataInputStream _dataIn = null;
+
+ /**
+ * Used to store written data.
+ */
+ protected ByteArrayOutputStream _storedData = new ByteArrayOutputStream();
+
+ /**
+ * DataOutputStream used to write the data
+ */
+ protected DataOutputStream _dataOut = new DataOutputStream(_storedData);
+
+ //--- Constructor
+ /**
+ * Constructor used by SessionImpl.
+ */
+ public BytesMessageImpl()
+ {
+ super();
+ setMessageType(String.valueOf(MessageFactory.JAVAX_JMS_BYTESMESSAGE));
+ }
+
+ /**
+ * Constructor used by MessageFactory
+ *
+ * @param message The new qpid message.
+ * @throws QpidException In case of problem when receiving the message body.
+ */
+ protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
+ {
+ super(message);
+ }
+
+ //--- BytesMessage API
+ /**
+ * Gets the number of bytes of the message body when the message
+ * is in read-only mode.
+ * <p> The value returned is the entire length of the message
+ * body, regardless of where the pointer for reading the message
+ * is currently located.
+ *
+ * @return Number of bytes in the message
+ * @throws JMSException If reading the message body length fails due to some error.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public long getBodyLength() throws JMSException
+ {
+ isReadable();
+ return getMessageData().capacity();
+ }
+
+ /**
+ * Reads a boolean.
+ *
+ * @return The boolean value read
+ * @throws JMSException If reading fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public boolean readBoolean() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readBoolean();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a signed 8-bit.
+ *
+ * @return The signed 8-bit read
+ * @throws JMSException If reading a signed 8-bit fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public byte readByte() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readByte();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads an unsigned 8-bit.
+ *
+ * @return The signed 8-bit read
+ * @throws JMSException If reading an unsigned 8-bit fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public int readUnsignedByte() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readUnsignedByte();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a short.
+ *
+ * @return The short read
+ * @throws JMSException If reading a short fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public short readShort() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readShort();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads an unsigned short.
+ *
+ * @return The unsigned short read
+ * @throws JMSException If reading an unsigned short fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public int readUnsignedShort() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readUnsignedShort();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a char.
+ *
+ * @return The char read
+ * @throws JMSException If reading a char fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public char readChar() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readChar();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads an int.
+ *
+ * @return The int read
+ * @throws JMSException If reading an int fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public int readInt() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readInt();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a long.
+ *
+ * @return The long read
+ * @throws JMSException If reading a long fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public long readLong() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readLong();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Read a float.
+ *
+ * @return The float read
+ * @throws JMSException If reading a float fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public float readFloat() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readFloat();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Read a double.
+ *
+ * @return The double read
+ * @throws JMSException If reading a double fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public double readDouble() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readDouble();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a string that has been encoded using a modified UTF-8 format.
+ *
+ * @return The String read
+ * @throws JMSException If reading a String fails due to some error.
+ * @throws MessageEOFException If unexpected end of message data has been reached.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public String readUTF() throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.readUTF();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException("Reach end of data when reading message data");
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a byte array from the bytes message data.
+ * <p> JMS sepc says:
+ * <P>If the length of array <code>bytes</code> is less than the number of
+ * bytes remaining to be read from the stream, the array should
+ * be filled. A subsequent call reads the next increment, and so on.
+ * <P>If the number of bytes remaining in the stream is less than the
+ * length of
+ * array <code>bytes</code>, the bytes should be read into the array.
+ * The return value of the total number of bytes read will be less than
+ * the length of the array, indicating that there are no more bytes left
+ * to be read from the stream. The next read of the stream returns -1.
+ *
+ * @param b The array into which the data is read.
+ * @return The total number of bytes read into the buffer, or -1 if
+ * there is no more data because the end of the stream has been reached
+ * @throws JMSException If reading a byte array fails due to some error.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public int readBytes(byte[] b) throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.read(b);
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Reads a portion of the bytes message data.
+ * <p> The JMS spec says
+ * <P>If the length of array <code>b</code> is less than the number of
+ * bytes remaining to be read from the stream, the array should
+ * be filled. A subsequent call reads the next increment, and so on.
+ * <P>If the number of bytes remaining in the stream is less than the
+ * length of array <code>b</code>, the bytes should be read into the array.
+ * The return value of the total number of bytes read will be less than
+ * the length of the array, indicating that there are no more bytes left
+ * to be read from the stream. The next read of the stream returns -1.
+ * <p> If <code>length</code> is negative, or
+ * <code>length</code> is greater than the length of the array
+ * <code>b</code>, then an <code>IndexOutOfBoundsException</code> is
+ * thrown. No bytes will be read from the stream for this exception case.
+ *
+ * @param b The buffer into which the data is read
+ * @param length The number of bytes to read; must be less than or equal to length.
+ * @return The total number of bytes read into the buffer, or -1 if
+ * there is no more data because the end of the data has been reached
+ * @throws JMSException If reading a byte array fails due to some error.
+ * @throws javax.jms.MessageNotReadableException
+ * If the message is in write-only mode.
+ */
+ public int readBytes(byte[] b, int length) throws JMSException
+ {
+ isReadable();
+ try
+ {
+ return _dataIn.read(b, 0, length);
+ }
+ catch (IOException ioe)
+ {
+ throw new JMSException("Problem when reading data", ioe.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a boolean to the bytes message.
+ *
+ * @param val The boolean value to be written
+ * @throws JMSException If writting a boolean fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeBoolean(boolean val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeBoolean(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a byte to the bytes message.
+ *
+ * @param val The byte value to be written
+ * @throws JMSException If writting a byte fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeByte(byte val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeByte(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a short to the bytes message.
+ *
+ * @param val The short value to be written
+ * @throws JMSException If writting a short fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeShort(short val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeShort(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+
+ }
+
+ /**
+ * Writes a char to the bytes message.
+ *
+ * @param c The char value to be written
+ * @throws JMSException If writting a char fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeChar(char c) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeChar(c);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes an int to the bytes message.
+ *
+ * @param val The int value to be written
+ * @throws JMSException If writting an int fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeInt(int val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeInt(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+
+ }
+
+ /**
+ * Writes a long to the bytes message.
+ *
+ * @param val The long value to be written
+ * @throws JMSException If writting a long fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeLong(long val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeLong(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a float to the bytes message.
+ *
+ * @param val The float value to be written
+ * @throws JMSException If writting a float fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeFloat(float val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeFloat(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a double to the bytes message.
+ *
+ * @param val The double value to be written
+ * @throws JMSException If writting a double fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeDouble(double val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeDouble(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a string to the bytes message.
+ *
+ * @param val The string value to be written
+ * @throws JMSException If writting a string fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeUTF(String val) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.writeUTF(val);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+
+ }
+
+ /**
+ * Writes a byte array to the bytes message.
+ *
+ * @param bytes The byte array value to be written
+ * @throws JMSException If writting a byte array fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.write(bytes);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes a portion of byte array to the bytes message.
+ *
+ * @param val The byte array value to be written
+ * @throws JMSException If writting a byte array fails due to some error.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeBytes(byte[] val, int offset, int length) throws JMSException
+ {
+ isWriteable();
+ try
+ {
+ _dataOut.write(val, offset, length);
+ }
+ catch (IOException e)
+ {
+ throw new JMSException("IO problem when writting " + e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Writes an Object to the bytes message.
+ * JMS spec says:
+ * <p>This method works only for the objectified primitive
+ * object types Integer, Double, Long, String and byte
+ * arrays.
+ *
+ * @param val The short value to be written
+ * @throws JMSException If writting a short fails due to some error.
+ * @throws NullPointerException if the parameter val is null.
+ * @throws MessageFormatException If the object is of an invalid type.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void writeObject(Object val) throws JMSException
+ {
+ if (val == null)
+ {
+ throw new NullPointerException("Cannot write null value to message");
+ }
+ if (val instanceof byte[])
+ {
+ writeBytes((byte[]) val);
+ }
+ else if (val instanceof String)
+ {
+ writeUTF((String) val);
+ }
+ else if (val instanceof Boolean)
+ {
+ writeBoolean(((Boolean) val).booleanValue());
+ }
+ else if (val instanceof Number)
+ {
+ if (val instanceof Byte)
+ {
+ writeByte(((Byte) val).byteValue());
+ }
+ else if (val instanceof Short)
+ {
+ writeShort(((Short) val).shortValue());
+ }
+ else if (val instanceof Integer)
+ {
+ writeInt(((Integer) val).intValue());
+ }
+ else if (val instanceof Long)
+ {
+ writeLong(((Long) val).longValue());
+ }
+ else if (val instanceof Float)
+ {
+ writeFloat(((Float) val).floatValue());
+ }
+ else if (val instanceof Double)
+ {
+ writeDouble(((Double) val).doubleValue());
+ }
+ else
+ {
+ throw new MessageFormatException("Trying to write an invalid obejct type: " + val);
+ }
+ }
+ else if (val instanceof Character)
+ {
+ writeChar(((Character) val).charValue());
+ }
+ else
+ {
+ throw new MessageFormatException("Trying to write an invalid obejct type: " + val);
+ }
+ }
+
+ /**
+ * Puts the message body in read-only mode and repositions the stream of
+ * bytes to the beginning.
+ *
+ * @throws JMSException If resetting the message fails due to some internal error.
+ * @throws MessageFormatException If the message has an invalid format.
+ */
+ public void reset() throws JMSException
+ {
+ _readOnly = true;
+ if (_dataIn == null)
+ {
+ // We were writting on this messsage so now read it
+ _dataIn = new DataInputStream(new ByteArrayInputStream(_storedData.toByteArray()));
+ }
+ else
+ {
+ // We were reading so reset it
+ try
+ {
+ _dataIn.reset();
+ }
+ catch (IOException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ // we log this exception as this should not happen
+ _logger.debug("Problem when resetting message: ", e);
+ }
+ throw new JMSException("Problem when resetting message: " + e.getLocalizedMessage());
+ }
+ }
+ }
+
+ //-- overwritten methods
+ /**
+ * Clear out the message body. Clearing a message's body does not clear
+ * its header values or property entries.
+ * <p>If this message body was read-only, calling this method leaves
+ * the message body is in the same state as an empty body in a newly
+ * created message.
+ *
+ * @throws JMSException If clearing this message body fails to due to some error.
+ */
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _dataIn = null;
+ _storedData = new ByteArrayOutputStream();
+ _dataOut = new DataOutputStream(_storedData);
+ }
+
+
+ /**
+ * This method is invoked before a message dispatch operation.
+ *
+ * @throws org.apache.qpidity.QpidException
+ * If the destination is not set
+ */
+ public void beforeMessageDispatch() throws QpidException
+ {
+ if (_dataOut.size() > 0)
+ {
+ setMessageData(ByteBuffer.wrap(_storedData.toByteArray()));
+ }
+ super.beforeMessageDispatch();
+ }
+
+ /**
+ * This method is invoked after this message is received.
+ *
+ * @throws QpidException
+ */
+ @Override
+ public void afterMessageReceive() throws QpidException
+ {
+ super.afterMessageReceive();
+ ByteBuffer messageData = getMessageData();
+ if (messageData != null)
+ {
+ try
+ {
+ _dataIn = new DataInputStream(asInputStream());
+ }
+ catch (Exception e)
+ {
+ throw new QpidException("Cannot retrieve data from message ", null, e);
+ }
+ }
+ }
+
+ //-- helper mehtods
+ /**
+ * Test whether this message is readable by throwing a MessageNotReadableException if this
+ * message cannot be read.
+ *
+ * @throws MessageNotReadableException If this message cannot be read.
+ */
+ protected void isReadable() throws MessageNotReadableException
+ {
+ if (_dataIn == null)
+ {
+ throw new MessageNotReadableException("Cannot read this message");
+ }
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,628 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms.message;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.MapMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Vector;
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * Implements javax.njms.MapMessage
+ */
+public class MapMessageImpl extends MessageImpl implements MapMessage
+{
+
+ /**
+ * The MapMessage's payload.
+ */
+ private Map<String, Object> _map = new HashMap<String, Object>();
+
+ //--- Constructor
+ /**
+ * Constructor used by SessionImpl.
+ */
+ public MapMessageImpl()
+ {
+ super();
+ setMessageType(String.valueOf(MessageFactory.JAVAX_JMS_MAPMESSAGE));
+ }
+
+ /**
+ * Constructor used by MessageFactory
+ *
+ * @param message The new qpid message.
+ * @throws QpidException In case of IO problem when reading the received message.
+ */
+ protected MapMessageImpl(org.apache.qpidity.api.Message message) throws QpidException
+ {
+ super(message);
+ }
+
+ //-- Map Message API
+ /**
+ * Indicates whether an key exists in this MapMessage.
+ *
+ * @param key the name of the key to test
+ * @return true if the key exists
+ * @throws JMSException If determinein if the key exists fails due to some internal error
+ */
+ public boolean itemExists(String key) throws JMSException
+ {
+ return _map.containsKey(key);
+ }
+
+ /**
+ * Returns the booleanvalue with the specified key.
+ *
+ * @param key The key name.
+ * @return The boolean value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public boolean getBoolean(String key) throws JMSException
+ {
+ boolean result = false;
+ if (_map.containsKey(key))
+ {
+ try
+ {
+ Object objValue = _map.get(key);
+ if (objValue != null)
+ {
+ result = MessageHelper.convertToBoolean(_map.get(key));
+ }
+ }
+ catch (ClassCastException e)
+ {
+ throw new MessageFormatException("Wrong type for key: " + key);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns the byte value with the specified name.
+ *
+ * @param key The key name.
+ * @return The byte value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public byte getByte(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ return MessageHelper.convertToByte(objValue);
+ }
+
+ /**
+ * Returns the <CODE>short</CODE> value with the specified name.
+ *
+ * @param key The key name.
+ * @return The <CODE>short</CODE> value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public short getShort(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ return MessageHelper.convertToShort(objValue);
+ }
+
+ /**
+ * Returns the Unicode character value with the specified name.
+ *
+ * @param key The key name.
+ * @return The Unicode charactervalue with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public char getChar(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new java.lang.NullPointerException();
+ }
+ return MessageHelper.convertToChar(objValue);
+ }
+
+ /**
+ * Returns the intvalue with the specified name.
+ *
+ * @param key The key name.
+ * @return The int value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public int getInt(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ return MessageHelper.convertToInt(objValue);
+ }
+
+ /**
+ * Returns the longvalue with the specified name.
+ *
+ * @param key The key name.
+ * @return The long value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public long getLong(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ return MessageHelper.convertToLong(objValue);
+ }
+
+ /**
+ * Returns the float value with the specified name.
+ *
+ * @param key The key name.
+ * @return The float value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public float getFloat(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ return MessageHelper.convertToFloat(objValue);
+ }
+
+ /**
+ * Returns the double value with the specified name.
+ *
+ * @param key The key name.
+ * @return The double value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public double getDouble(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ return MessageHelper.convertToDouble(objValue);
+ }
+
+ /**
+ * Returns the String value with the specified name.
+ *
+ * @param key The key name.
+ * @return The String value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public String getString(String key) throws JMSException
+ {
+ String result = null;
+ Object objValue = _map.get(key);
+ if (objValue != null)
+ {
+ if (objValue instanceof byte[])
+ {
+ throw new NumberFormatException("Wrong type for key: " + key);
+ }
+ else
+ {
+ result = objValue.toString();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns the byte array value with the specified name.
+ *
+ * @param key The key name.
+ * @return The byte value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If this type conversion is invalid.
+ */
+ public byte[] getBytes(String key) throws JMSException
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ return null;
+ }
+ if (objValue instanceof byte[])
+ {
+ byte[] value = (byte[]) objValue;
+ byte[] toReturn = new byte[value.length];
+ System.arraycopy(value, 0, toReturn, 0, value.length);
+ return toReturn;
+ }
+ throw new MessageFormatException("Wrong type for key: " + key);
+ }
+
+ /**
+ * Returns the value of the object with the specified name.
+ *
+ * @param key The key name.
+ * @return The byte value with the specified key.
+ * @throws JMSException If reading the message fails due to some internal error.
+ */
+ public Object getObject(String key) throws JMSException
+ {
+ try
+ {
+ Object objValue = _map.get(key);
+ if (objValue == null)
+ {
+ return null;
+ }
+ else if (objValue instanceof byte[])
+ {
+ byte[] value = (byte[]) objValue;
+ byte[] toReturn = new byte[value.length];
+ System.arraycopy(value, 0, toReturn, 0, value.length);
+ return toReturn;
+ }
+ else
+ {
+ return objValue;
+ }
+ }
+ catch (java.lang.ClassCastException cce)
+ {
+ throw new MessageFormatException("Wrong type for key: " + key);
+ }
+ }
+
+ /**
+ * Returns an Enumeration of all the keys
+ *
+ * @return an enumeration of all the keys in this MapMessage
+ * @throws JMSException If reading the message fails due to some internal error.
+ */
+ public Enumeration getMapNames() throws JMSException
+ {
+ Vector<String> propVector = new Vector<String>(_map.keySet());
+ return propVector.elements();
+ }
+
+ /**
+ * Sets a boolean value with the specified key into the Map.
+ *
+ * @param key The key name.
+ * @param value The boolean value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setBoolean(String key, boolean value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a byte value with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The byte value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setByte(String key, byte value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a shortvalue with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The short value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setShort(String key, short value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a Unicode character value with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The character value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setChar(String key, char value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets an intvalue with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The int value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setInt(String key, int value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a long value with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The long value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setLong(String key, long value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a float value with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The float value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setFloat(String key, float value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a double value with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The double value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setDouble(String key, double value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a String value with the specified name into the Map.
+ *
+ * @param key The key name.
+ * @param value The String value to set in the Map.
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setString(String key, String value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a byte array value with the specified name into the Map.
+ *
+ * @param key the name of the byte array
+ * @param value the byte array value to set in the Map; the array
+ * is copied so that the value for <CODE>name</CODE> will
+ * not be altered by future modifications
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setBytes(String key, byte[] value) throws JMSException, NullPointerException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ byte[] newBytes = new byte[value.length];
+ System.arraycopy(value, 0, newBytes, 0, value.length);
+ _map.put(key, value);
+ }
+
+ /**
+ * Sets a portion of the byte array value with the specified name into the
+ * Map.
+ *
+ * @param key the name of the byte array
+ * @param value the byte array value to set in the Map; the array
+ * is copied so that the value for <CODE>name</CODE> will
+ * not be altered by future modifications
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setBytes(String key, byte[] value, int offset, int length) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ byte[] newBytes = new byte[length];
+ System.arraycopy(value, offset, newBytes, 0, length);
+ _map.put(key, newBytes);
+ }
+
+ /**
+ * Sets an object value with the specified name into the Map.
+ *
+ * @param key the name of the byte array
+ * @param value the byte array value to set in the Map; the array
+ * is copied so that the value for <CODE>name</CODE> will
+ * not be altered by future modifications
+ * @throws JMSException If writting the message fails due to some internal error.
+ * @throws IllegalArgumentException If the key is nul or an empty string.
+ * @throws javax.jms.MessageNotWriteableException
+ * If the message is in read-only mode.
+ */
+ public void setObject(String key, Object value) throws JMSException, IllegalArgumentException
+ {
+ isWriteable();
+ checkNotNullKey(key);
+ if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
+ {
+ _map.put(key, value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot set property " + key + " to value " + value + "of type " + value
+ .getClass().getName() + ".");
+ }
+ }
+
+ //-- Overwritten methods
+ /**
+ * This method is invoked before this message is dispatched.
+ */
+ @Override
+ public void beforeMessageDispatch() throws QpidException
+ {
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(_map);
+ byte[] bytes = baos.toByteArray();
+ setMessageData(ByteBuffer.wrap(bytes));
+ }
+ catch (java.io.IOException ioe)
+ {
+ throw new QpidException("problem when dispatching message", null, ioe);
+ }
+ super.beforeMessageDispatch();
+ }
+
+
+ /**
+ * This method is invoked after this message has been received.
+ */
+ @Override
+ public void afterMessageReceive() throws QpidException
+ {
+ super.afterMessageReceive();
+ ByteBuffer messageData = getMessageData();
+ if (messageData != null)
+ {
+ try
+ {
+ ObjectInputStream ois = new ObjectInputStream(asInputStream());
+ _map = (Map<String, Object>) ois.readObject();
+ }
+ catch (IOException ioe)
+ {
+ throw new QpidException(
+ "Unexpected error during rebuild of message in afterReceive(): " + "- IO Exception", null, ioe);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new QpidException(
+ "Unexpected error during rebuild of message in afterReceive(): " + "- Could not find the required class in classpath.",
+ null, e);
+ }
+ }
+ }
+
+ //-- protected methods
+ /**
+ * This method throws an <CODE>IllegalArgumentException</CODE> if the supplied parameter is null.
+ *
+ * @param key The key to check.
+ * @throws IllegalArgumentException If the key is null.
+ */
+ private void checkNotNullKey(String key) throws IllegalArgumentException
+ {
+ if (key == null || key.equals(""))
+ {
+ throw new IllegalArgumentException("Key cannot be null");
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java
------------------------------------------------------------------------------
svn:eol-style = native