You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 16:33:07 UTC
svn commit: r1526190 [5/7] - in /qpid/trunk/qpid/java/amqp-1-0-client-jms:
example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/
src/main/java/org/apache/qpid/amqp_1_0/jms/
src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ src/main/java/org/apache...
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Wed Sep 25 14:33:06 2013
@@ -1,489 +1,489 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
-import org.apache.qpid.amqp_1_0.jms.QueueSender;
-import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
-import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import java.util.UUID;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
-{
- private boolean _disableMessageID;
- private boolean _disableMessageTimestamp;
- private int _deliveryMode = Message.DEFAULT_DELIVERY_MODE;
- private int _priority = Message.DEFAULT_PRIORITY;
- private long _timeToLive;
-
- private DestinationImpl _destination;
- private SessionImpl _session;
- private Sender _sender;
- private boolean _closed;
- private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
- private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
-
- protected MessageProducerImpl(final Destination destination,
- final SessionImpl session) throws JMSException
- {
- if(destination instanceof DestinationImpl)
- {
- _destination = (DestinationImpl) destination;
- }
- else if(destination != null)
- {
- throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
- }
-
- _session = session;
-
- if(_destination != null)
- {
- try
- {
- _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
- }
- catch (Sender.SenderCreationException e)
- {
- // TODO - refine exception
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- catch (ConnectionClosedException e)
- {
-
- // TODO - refine exception
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- _sender.setRemoteErrorListener(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
-
- if(exceptionListener != null)
- {
- final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
- exceptionListener.onException(new JMSException(receiverError.getDescription(),
- receiverError.getCondition().getValue().toString()));
-
- }
- }
- catch (JMSException e)
- {
-
- }
- }
- });
- }
- }
-
- private void checkClosed() throws IllegalStateException
- {
- if(_closed)
- {
- throw new javax.jms.IllegalStateException("Producer closed");
- }
- }
-
- public boolean getDisableMessageID() throws IllegalStateException
- {
- checkClosed();
- return _disableMessageID;
- }
-
- public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
- {
- checkClosed();
- _disableMessageID = disableMessageID;
- }
-
- public boolean getDisableMessageTimestamp() throws IllegalStateException
- {
- checkClosed();
- return _disableMessageTimestamp;
- }
-
- public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
- {
- checkClosed();
- _disableMessageTimestamp = disableMessageTimestamp;
- }
-
- public int getDeliveryMode() throws IllegalStateException
- {
- checkClosed();
- return _deliveryMode;
- }
-
- public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
- {
- checkClosed();
- _deliveryMode = deliveryMode;
- }
-
- public int getPriority() throws IllegalStateException
- {
- checkClosed();
- return _priority;
- }
-
- public void setPriority(final int priority) throws IllegalStateException
- {
- checkClosed();
- _priority = priority;
- }
-
- public long getTimeToLive() throws IllegalStateException
- {
- checkClosed();
- return _timeToLive;
- }
-
- public void setTimeToLive(final long timeToLive) throws IllegalStateException
- {
- checkClosed();
- _timeToLive = timeToLive;
- }
-
- public DestinationImpl getDestination() throws JMSException
- {
- checkClosed();
- return _destination;
- }
-
- public void close() throws JMSException
- {
- try
- {
- if(!_closed)
- {
- _closed = true;
- if(_sender != null)
- {
- _sender.close();
- }
- }
-
- }
- catch (Sender.SenderClosingException e)
- {
- final JMSException jmsException = new JMSException("error closing");
- jmsException.setLinkedException(e);
- throw jmsException;
- }
- }
-
- public void send(final Message message) throws JMSException
- {
- send(message, getDeliveryMode(), getPriority(), getTimeToLive());
- }
-
- public void send(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
- {
- if(_sender == null)
- {
- throw new UnsupportedOperationException("No Destination provided");
- }
- if(_destination instanceof TemporaryDestination && ((TemporaryDestination)_destination).isDeleted())
- {
- throw new IllegalStateException("Destination is deleted");
- }
-
-
- //TODO
- MessageImpl msg;
- if(message instanceof org.apache.qpid.amqp_1_0.jms.Message)
- {
- msg = (MessageImpl) message;
- }
- else
- {
- msg = _session.convertMessage(message);
- }
-
-
-
- msg.setJMSDeliveryMode(deliveryMode);
- msg.setJMSPriority(priority);
-
- msg.setJMSDestination(_destination);
-
- long timestamp = 0l;
-
- if(!getDisableMessageTimestamp() || ttl != 0)
- {
- timestamp = System.currentTimeMillis();
- msg.setJMSTimestamp(timestamp);
-
- }
- if(ttl != 0)
- {
- msg.setTtl(UnsignedInteger.valueOf(ttl));
- }
- else
- {
- msg.setTtl(null);
- }
-
- if(!getDisableMessageID() && msg.getMessageId() == null)
- {
- final Object messageId = generateMessageId();
- msg.setMessageId(messageId);
-
- }
-
- if(message != msg)
- {
- message.setJMSTimestamp(msg.getJMSTimestamp());
- message.setJMSMessageID(msg.getJMSMessageID());
- message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
- message.setJMSPriority(msg.getJMSPriority());
- message.setJMSExpiration(msg.getJMSExpiration());
- }
-
-
- final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
-
- DispositionAction action = null;
-
- if(_syncPublish)
- {
- action = new DispositionAction(_sender);
- }
-
- try
- {
- _sender.send(clientMessage, _session.getTxn(), action);
- }
- catch (LinkDetachedException e)
- {
- JMSException jmsException = new InvalidDestinationException("Sender has been closed");
- jmsException.setLinkedException(e);
- throw jmsException;
- }
-
- if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
- {
- throw new MessageRejectedException("Message was rejected");
- }
-
- if(getDestination() != null)
- {
- message.setJMSDestination(getDestination());
- }
- }
-
- public void send(final javax.jms.Queue queue, final Message message) throws JMSException
- {
- send((Destination)queue, message);
- }
-
- public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
- throws JMSException
- {
- send((Destination)queue, message, deliveryMode, priority, ttl);
- }
-
- private Object generateMessageId()
- {
- UUID uuid = UUID.randomUUID();
- final String messageIdString = uuid.toString();
- return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString;
- }
-
- public void send(final Destination destination, final Message message) throws JMSException
- {
- send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
- }
-
- public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
- throws JMSException
- {
-
- checkClosed();
- if(destination == null)
- {
- send(message, deliveryMode, priority, ttl);
- }
- else
- {
- if(_destination != null)
- {
- throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
- }
- else if(!(destination instanceof DestinationImpl))
- {
- throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
- }
- else if(destination instanceof TemporaryDestination && ((TemporaryDestination)destination).isDeleted())
- {
- throw new IllegalStateException("Destination has been deleted");
- }
- try
- {
- _destination = (DestinationImpl) destination;
- _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
-
- send(message, deliveryMode, priority, ttl);
-
- _sender.close();
-
-
-
- }
- catch (Sender.SenderCreationException e)
- {
- // TODO - refine exception
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- catch (Sender.SenderClosingException e)
- {
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- catch (ConnectionClosedException e)
- {
-
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- finally
- {
- _sender = null;
- _destination = null;
- }
- }
- }
-
- public QueueImpl getQueue() throws JMSException
- {
- return (QueueImpl) getDestination();
- }
-
- public TopicImpl getTopic() throws JMSException
- {
- return (TopicImpl) getDestination();
- }
-
- public void publish(final Message message) throws JMSException
- {
- send(message);
- }
-
- public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
- {
- send(message, deliveryMode, priority, ttl);
- }
-
- public void publish(final Topic topic, final Message message) throws JMSException
- {
- send(topic, message);
- }
-
- public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
- throws JMSException
- {
- send(topic, message, deliveryMode, priority, ttl);
- }
-
- private static class DispositionAction implements Sender.OutcomeAction
- {
- private final Sender _sender;
- private final Object _lock;
- private Outcome _outcome;
-
- public DispositionAction(Sender sender)
- {
- _sender = sender;
- _lock = sender.getEndpoint().getLock();
- }
-
- @Override
- public void onOutcome(Binary deliveryTag, Outcome outcome)
- {
- synchronized (_lock)
- {
- _outcome = outcome;
- _lock.notifyAll();
- }
- }
-
- public boolean wasAccepted(long timeout) throws JMSException
- {
- synchronized(_lock)
- {
- while(_outcome == null && !_sender.getEndpoint().isDetached())
- {
- try
- {
- _lock.wait(timeout - System.currentTimeMillis());
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- if(_outcome == null)
- {
-
- if(_sender.getEndpoint().isDetached())
- {
- throw new JMSException("Link was detached");
- }
- else
- {
- throw new JMSException("Timed out waiting for message acceptance");
- }
- }
- else
- {
- return _outcome instanceof Accepted;
- }
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
+import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
+{
+ private boolean _disableMessageID;
+ private boolean _disableMessageTimestamp;
+ private int _deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ private int _priority = Message.DEFAULT_PRIORITY;
+ private long _timeToLive;
+
+ private DestinationImpl _destination;
+ private SessionImpl _session;
+ private Sender _sender;
+ private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
+
+ protected MessageProducerImpl(final Destination destination,
+ final SessionImpl session) throws JMSException
+ {
+ if(destination instanceof DestinationImpl)
+ {
+ _destination = (DestinationImpl) destination;
+ }
+ else if(destination != null)
+ {
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+ }
+
+ _session = session;
+
+ if(_destination != null)
+ {
+ try
+ {
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ catch (ConnectionClosedException e)
+ {
+
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ _sender.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final org.apache.qpid.amqp_1_0.type.transport.Error receiverError = _sender.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
+ }
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ {
+ throw new javax.jms.IllegalStateException("Producer closed");
+ }
+ }
+
+ public boolean getDisableMessageID() throws IllegalStateException
+ {
+ checkClosed();
+ return _disableMessageID;
+ }
+
+ public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
+ {
+ checkClosed();
+ _disableMessageID = disableMessageID;
+ }
+
+ public boolean getDisableMessageTimestamp() throws IllegalStateException
+ {
+ checkClosed();
+ return _disableMessageTimestamp;
+ }
+
+ public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
+ {
+ checkClosed();
+ _disableMessageTimestamp = disableMessageTimestamp;
+ }
+
+ public int getDeliveryMode() throws IllegalStateException
+ {
+ checkClosed();
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
+ {
+ checkClosed();
+ _deliveryMode = deliveryMode;
+ }
+
+ public int getPriority() throws IllegalStateException
+ {
+ checkClosed();
+ return _priority;
+ }
+
+ public void setPriority(final int priority) throws IllegalStateException
+ {
+ checkClosed();
+ _priority = priority;
+ }
+
+ public long getTimeToLive() throws IllegalStateException
+ {
+ checkClosed();
+ return _timeToLive;
+ }
+
+ public void setTimeToLive(final long timeToLive) throws IllegalStateException
+ {
+ checkClosed();
+ _timeToLive = timeToLive;
+ }
+
+ public DestinationImpl getDestination() throws JMSException
+ {
+ checkClosed();
+ return _destination;
+ }
+
+ public void close() throws JMSException
+ {
+ try
+ {
+ if(!_closed)
+ {
+ _closed = true;
+ if(_sender != null)
+ {
+ _sender.close();
+ }
+ }
+
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException("error closing");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ public void send(final Message message) throws JMSException
+ {
+ send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ }
+
+ public void send(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+ {
+ if(_sender == null)
+ {
+ throw new UnsupportedOperationException("No Destination provided");
+ }
+ if(_destination instanceof TemporaryDestination && ((TemporaryDestination)_destination).isDeleted())
+ {
+ throw new IllegalStateException("Destination is deleted");
+ }
+
+
+ //TODO
+ MessageImpl msg;
+ if(message instanceof org.apache.qpid.amqp_1_0.jms.Message)
+ {
+ msg = (MessageImpl) message;
+ }
+ else
+ {
+ msg = _session.convertMessage(message);
+ }
+
+
+
+ msg.setJMSDeliveryMode(deliveryMode);
+ msg.setJMSPriority(priority);
+
+ msg.setJMSDestination(_destination);
+
+ long timestamp = 0l;
+
+ if(!getDisableMessageTimestamp() || ttl != 0)
+ {
+ timestamp = System.currentTimeMillis();
+ msg.setJMSTimestamp(timestamp);
+
+ }
+ if(ttl != 0)
+ {
+ msg.setTtl(UnsignedInteger.valueOf(ttl));
+ }
+ else
+ {
+ msg.setTtl(null);
+ }
+
+ if(!getDisableMessageID() && msg.getMessageId() == null)
+ {
+ final Object messageId = generateMessageId();
+ msg.setMessageId(messageId);
+
+ }
+
+ if(message != msg)
+ {
+ message.setJMSTimestamp(msg.getJMSTimestamp());
+ message.setJMSMessageID(msg.getJMSMessageID());
+ message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
+ message.setJMSPriority(msg.getJMSPriority());
+ message.setJMSExpiration(msg.getJMSExpiration());
+ }
+
+
+ final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
+
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
+
+ if(getDestination() != null)
+ {
+ message.setJMSDestination(getDestination());
+ }
+ }
+
+ public void send(final javax.jms.Queue queue, final Message message) throws JMSException
+ {
+ send((Destination)queue, message);
+ }
+
+ public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+ send((Destination)queue, message, deliveryMode, priority, ttl);
+ }
+
+ private Object generateMessageId()
+ {
+ UUID uuid = UUID.randomUUID();
+ final String messageIdString = uuid.toString();
+ return _session.getConnection().useBinaryMessageId() ? new Binary(messageIdString.getBytes()) : messageIdString;
+ }
+
+ public void send(final Destination destination, final Message message) throws JMSException
+ {
+ send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+ }
+
+ public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+
+ checkClosed();
+ if(destination == null)
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+ else
+ {
+ if(_destination != null)
+ {
+ throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
+ }
+ else if(!(destination instanceof DestinationImpl))
+ {
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+ }
+ else if(destination instanceof TemporaryDestination && ((TemporaryDestination)destination).isDeleted())
+ {
+ throw new IllegalStateException("Destination has been deleted");
+ }
+ try
+ {
+ _destination = (DestinationImpl) destination;
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
+
+ send(message, deliveryMode, priority, ttl);
+
+ _sender.close();
+
+
+
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ catch (ConnectionClosedException e)
+ {
+
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ finally
+ {
+ _sender = null;
+ _destination = null;
+ }
+ }
+ }
+
+ public QueueImpl getQueue() throws JMSException
+ {
+ return (QueueImpl) getDestination();
+ }
+
+ public TopicImpl getTopic() throws JMSException
+ {
+ return (TopicImpl) getDestination();
+ }
+
+ public void publish(final Message message) throws JMSException
+ {
+ send(message);
+ }
+
+ public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+
+ public void publish(final Topic topic, final Message message) throws JMSException
+ {
+ send(topic, message);
+ }
+
+ public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+ send(topic, message, deliveryMode, priority, ttl);
+ }
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java Wed Sep 25 14:33:06 2013
@@ -1,161 +1,161 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.*;
-
-public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
-{
- static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
-
- static final Data NULL_OBJECT_DATA;
- static
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try
- {
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(null);
- oos.flush();
- oos.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray()));
- }
-
- private Data _objectData = NULL_OBJECT_DATA;
-
- protected ObjectMessageImpl(Header header,
- MessageAnnotations messageAnnotations,
- Properties properties,
- ApplicationProperties appProperties,
- Data dataSection,
- Footer footer,
- SessionImpl session)
- {
- super(header, messageAnnotations, properties, appProperties, footer, session);
- getProperties().setContentType(CONTENT_TYPE);
- Serializable serializable = null;
- _objectData = dataSection;
-
- }
-
- protected ObjectMessageImpl(final SessionImpl session)
- {
- super(new Header(), new MessageAnnotations(new HashMap()),
- new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
- session);
- getProperties().setContentType(CONTENT_TYPE);
- }
-
- public void setObject(final Serializable serializable) throws MessageNotWriteableException
- {
- checkWritable();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try
- {
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(serializable);
- oos.flush();
- oos.close();
-
- _objectData = new Data(new Binary(baos.toByteArray()));
-
- }
- catch (IOException e)
- {
- e.printStackTrace(); //TODO
- }
- }
-
- public Serializable getObject() throws JMSException
- {
-
- if(_objectData == null)
- {
- return null;
- }
-
- Binary data = _objectData.getValue();
-
- try
- {
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
- return (Serializable) ois.readObject();
- }
- catch (IOException e)
- {
- JMSException jmsException = new JMSException(e.getMessage());
- jmsException.setLinkedException(e);
- throw jmsException;
- }
- catch (ClassNotFoundException e)
- {
-
- JMSException jmsException = new JMSException(e.getMessage());
- jmsException.setLinkedException(e);
- throw jmsException;
- }
-
- }
-
- @Override
- public void clearBody() throws JMSException
- {
- super.clearBody();
- _objectData = null;
- }
-
- @Override Collection<Section> getSections()
- {
- List<Section> sections = new ArrayList<Section>();
- sections.add(getHeader());
- if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
- {
- sections.add(getMessageAnnotations());
- }
- sections.add(getProperties());
- sections.add(getApplicationProperties());
-
- sections.add(_objectData);
-
- sections.add(getFooter());
- return sections;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.*;
+
+public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
+{
+ static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
+
+ static final Data NULL_OBJECT_DATA;
+ static
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(null);
+ oos.flush();
+ oos.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray()));
+ }
+
+ private Data _objectData = NULL_OBJECT_DATA;
+
+ protected ObjectMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
+ Properties properties,
+ ApplicationProperties appProperties,
+ Data dataSection,
+ Footer footer,
+ SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ getProperties().setContentType(CONTENT_TYPE);
+ Serializable serializable = null;
+ _objectData = dataSection;
+
+ }
+
+ protected ObjectMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ getProperties().setContentType(CONTENT_TYPE);
+ }
+
+ public void setObject(final Serializable serializable) throws MessageNotWriteableException
+ {
+ checkWritable();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(serializable);
+ oos.flush();
+ oos.close();
+
+ _objectData = new Data(new Binary(baos.toByteArray()));
+
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+ }
+
+ public Serializable getObject() throws JMSException
+ {
+
+ if(_objectData == null)
+ {
+ return null;
+ }
+
+ Binary data = _objectData.getValue();
+
+ try
+ {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
+ return (Serializable) ois.readObject();
+ }
+ catch (IOException e)
+ {
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ catch (ClassNotFoundException e)
+ {
+
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _objectData = null;
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+
+ sections.add(_objectData);
+
+ sections.add(getFooter());
+ return sections;
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java Wed Sep 25 14:33:06 2013
@@ -1,202 +1,202 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-
-public class QueueBrowserImpl implements QueueBrowser
-{
- private static final String JMS_SELECTOR = "jms-selector";
- private QueueImpl _queue;
- private String _selector;
- private final SessionImpl _session;
- private Map<Symbol, Filter> _filters;
- private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
- private boolean _closed;
-
- QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
- {
- _queue = queue;
- _selector = selector;
- _session = session;
-
-
- if(selector == null || selector.trim().equals(""))
- {
- _filters = null;
- }
- else
- {
- _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
- // We do this just to have the server validate the filter..
- new MessageEnumeration().close();
- }
- }
-
- public QueueImpl getQueue()
- {
- return _queue;
- }
-
- public String getMessageSelector()
- {
- return _selector;
- }
-
- public Enumeration getEnumeration() throws JMSException
- {
- if(_closed)
- {
- throw new IllegalStateException("Browser has been closed");
- }
- return new MessageEnumeration();
- }
-
- public void close() throws JMSException
- {
- _closed = true;
- for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
- {
- me.close();
- }
- }
-
- private final class MessageEnumeration implements Enumeration<MessageImpl>
- {
- private Receiver _receiver;
- private MessageImpl _nextElement;
- private boolean _needNext = true;
-
- MessageEnumeration() throws JMSException
- {
- try
- {
- _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
- StdDistMode.COPY,
- AcknowledgeMode.AMO, null,
- false,
- _filters, null);
- _receiver.setCredit(UnsignedInteger.valueOf(100), true);
- }
- catch(ConnectionErrorException e)
- {
- org.apache.qpid.amqp_1_0.type.transport.Error error = e.getRemoteError();
- if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
- {
- throw new InvalidSelectorException(e.getMessage());
- }
- else
- {
- throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
- }
-
- }
- _enumerations.add(this);
-
- }
-
- public void close()
- {
- _enumerations.remove(this);
- _receiver.close();
- _receiver = null;
- }
-
- @Override
- public boolean hasMoreElements()
- {
- if( _receiver == null )
- {
- return false;
- }
- if( _needNext )
- {
- _needNext = false;
- _nextElement = createJMSMessage(_receiver.receive(0L));
- if( _nextElement == null )
- {
- // Drain to verify there really are no more messages.
- _receiver.drain();
- _receiver.drainWait();
- _nextElement = createJMSMessage(_receiver.receive(0L));
- if( _nextElement == null )
- {
- close();
- }
- else
- {
- // there are still more messages, open up the credit window again..
- _receiver.clearDrain();
- }
- }
- }
- return _nextElement != null;
- }
-
- @Override
- public MessageImpl nextElement()
- {
- if( hasMoreElements() )
- {
- MessageImpl message = _nextElement;
- _nextElement = null;
- _needNext = true;
- return message;
- }
- else
- {
- throw new NoSuchElementException();
- }
- }
- }
-
- MessageImpl createJMSMessage(final Message msg)
- {
- if(msg != null)
- {
- final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
- message.setFromQueue(true);
- message.setFromTopic(false);
- return message;
- }
- else
- {
- return null;
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+
+public class QueueBrowserImpl implements QueueBrowser
+{
+ private static final String JMS_SELECTOR = "jms-selector";
+ private QueueImpl _queue;
+ private String _selector;
+ private final SessionImpl _session;
+ private Map<Symbol, Filter> _filters;
+ private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
+ private boolean _closed;
+
+ QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
+ {
+ _queue = queue;
+ _selector = selector;
+ _session = session;
+
+
+ if(selector == null || selector.trim().equals(""))
+ {
+ _filters = null;
+ }
+ else
+ {
+ _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+ // We do this just to have the server validate the filter..
+ new MessageEnumeration().close();
+ }
+ }
+
+ public QueueImpl getQueue()
+ {
+ return _queue;
+ }
+
+ public String getMessageSelector()
+ {
+ return _selector;
+ }
+
+ public Enumeration getEnumeration() throws JMSException
+ {
+ if(_closed)
+ {
+ throw new IllegalStateException("Browser has been closed");
+ }
+ return new MessageEnumeration();
+ }
+
+ public void close() throws JMSException
+ {
+ _closed = true;
+ for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
+ {
+ me.close();
+ }
+ }
+
+ private final class MessageEnumeration implements Enumeration<MessageImpl>
+ {
+ private Receiver _receiver;
+ private MessageImpl _nextElement;
+ private boolean _needNext = true;
+
+ MessageEnumeration() throws JMSException
+ {
+ try
+ {
+ _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
+ StdDistMode.COPY,
+ AcknowledgeMode.AMO, null,
+ false,
+ _filters, null);
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
+ catch(ConnectionErrorException e)
+ {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = e.getRemoteError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ }
+
+ }
+ _enumerations.add(this);
+
+ }
+
+ public void close()
+ {
+ _enumerations.remove(this);
+ _receiver.close();
+ _receiver = null;
+ }
+
+ @Override
+ public boolean hasMoreElements()
+ {
+ if( _receiver == null )
+ {
+ return false;
+ }
+ if( _needNext )
+ {
+ _needNext = false;
+ _nextElement = createJMSMessage(_receiver.receive(0L));
+ if( _nextElement == null )
+ {
+ // Drain to verify there really are no more messages.
+ _receiver.drain();
+ _receiver.drainWait();
+ _nextElement = createJMSMessage(_receiver.receive(0L));
+ if( _nextElement == null )
+ {
+ close();
+ }
+ else
+ {
+ // there are still more messages, open up the credit window again..
+ _receiver.clearDrain();
+ }
+ }
+ }
+ return _nextElement != null;
+ }
+
+ @Override
+ public MessageImpl nextElement()
+ {
+ if( hasMoreElements() )
+ {
+ MessageImpl message = _nextElement;
+ _nextElement = null;
+ _needNext = true;
+ return message;
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+ }
+
+ MessageImpl createJMSMessage(final Message msg)
+ {
+ if(msg != null)
+ {
+ final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
+ message.setFromQueue(true);
+ message.setFromTopic(false);
+ return message;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java Wed Sep 25 14:33:06 2013
@@ -1,48 +1,48 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.QueueConnection;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.ServerSessionPool;
-
-public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
-{
- QueueConnectionImpl(String host, int port, String username, String password, String clientId)
- throws JMSException
- {
- super(host, port, username, password, clientId);
- }
-
- public QueueSessionImpl createQueueSession(final boolean b, final int i) throws JMSException
- {
- return null; //TODO
- }
-
- public ConnectionConsumer createConnectionConsumer(final Queue queue,
- final String s,
- final ServerSessionPool serverSessionPool,
- final int i) throws JMSException
- {
- return null; //TODO
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueConnection;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.ServerSessionPool;
+
+public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
+{
+ QueueConnectionImpl(String host, int port, String username, String password, String clientId)
+ throws JMSException
+ {
+ super(host, port, username, password, clientId);
+ }
+
+ public QueueSessionImpl createQueueSession(final boolean b, final int i) throws JMSException
+ {
+ return null; //TODO
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Queue queue,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ return null; //TODO
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java Wed Sep 25 14:33:06 2013
@@ -1,56 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.Queue;
-
-import java.util.WeakHashMap;
-
-public class QueueImpl extends DestinationImpl implements Queue
-{
- private static final WeakHashMap<String, QueueImpl> QUEUE_CACHE =
- new WeakHashMap<String, QueueImpl>();
-
- public QueueImpl(String address)
- {
- super(address);
- }
-
- public String getQueueName()
- {
- return getAddress();
- }
-
- public static synchronized QueueImpl createQueue(final String address)
- {
- QueueImpl queue = QUEUE_CACHE.get(address);
- if(queue == null)
- {
- queue = new QueueImpl(address);
- QUEUE_CACHE.put(address, queue);
- }
- return queue;
- }
-
- public static QueueImpl valueOf(String address)
- {
- return address == null ? null : createQueue(address);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Queue;
+
+import java.util.WeakHashMap;
+
+public class QueueImpl extends DestinationImpl implements Queue
+{
+ private static final WeakHashMap<String, QueueImpl> QUEUE_CACHE =
+ new WeakHashMap<String, QueueImpl>();
+
+ public QueueImpl(String address)
+ {
+ super(address);
+ }
+
+ public String getQueueName()
+ {
+ return getAddress();
+ }
+
+ public static synchronized QueueImpl createQueue(final String address)
+ {
+ QueueImpl queue = QUEUE_CACHE.get(address);
+ if(queue == null)
+ {
+ queue = new QueueImpl(address);
+ QUEUE_CACHE.put(address, queue);
+ }
+ return queue;
+ }
+
+ public static QueueImpl valueOf(String address)
+ {
+ return address == null ? null : createQueue(address);
+ }
+
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Wed Sep 25 14:33:06 2013
@@ -1,56 +1,56 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import javax.jms.JMSException;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-
-public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
-{
- QueueReceiverImpl(final QueueImpl destination,
- final SessionImpl session,
- final String selector,
- final boolean noLocal)
- throws JMSException
- {
- super(destination, session, selector, noLocal);
- setQueueConsumer(true);
- }
-
- protected Receiver createClientReceiver() throws JMSException
- {
- try
- {
- return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
- }
- catch (ConnectionErrorException e)
- {
- throw new JMSException(e.getMessage(), e.getRemoteError().getCondition().toString());
- }
- }
-
- public Queue getQueue() throws JMSException
- {
- return (QueueImpl) getDestination();
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import javax.jms.JMSException;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+ QueueReceiverImpl(final QueueImpl destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal)
+ throws JMSException
+ {
+ super(destination, session, selector, noLocal);
+ setQueueConsumer(true);
+ }
+
+ protected Receiver createClientReceiver() throws JMSException
+ {
+ try
+ {
+ return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
+ }
+ catch (ConnectionErrorException e)
+ {
+ throw new JMSException(e.getMessage(), e.getRemoteError().getCondition().toString());
+ }
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ return (QueueImpl) getDestination();
+ }
+
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java Wed Sep 25 14:33:06 2013
@@ -1,36 +1,36 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.QueueSender;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
-{
- protected QueueSenderImpl(final Destination destination, final SessionImpl session)
- throws JMSException
- {
- super(destination, session);
- }
-
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+ protected QueueSenderImpl(final Destination destination, final SessionImpl session)
+ throws JMSException
+ {
+ super(destination, session);
+ }
+
+
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSenderImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java Wed Sep 25 14:33:06 2013
@@ -1,57 +1,57 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.QueueSession;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
-public class QueueSessionImpl extends SessionImpl implements QueueSession
-{
- protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
- throws JMSException
- {
- super(connection, acknowledgeMode);
- setQueueSession(true);
- }
-
- public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
- {
- return createReceiver(queue, null);
- }
-
- public QueueReceiverImpl createReceiver(final Queue queue, final String selector) throws JMSException
- {
- // TODO - assert queue is a queueimpl and throw relevant JMS Exception
- final QueueReceiverImpl messageConsumer;
- synchronized(getClientSession().getEndpoint().getLock())
- {
- messageConsumer = new QueueReceiverImpl((QueueImpl)queue, this, selector, false);
- addConsumer(messageConsumer);
- }
- return messageConsumer;
-
- }
-
- public QueueSenderImpl createSender(final Queue queue) throws JMSException
- {
- return null; //TODO
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+public class QueueSessionImpl extends SessionImpl implements QueueSession
+{
+ protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
+ throws JMSException
+ {
+ super(connection, acknowledgeMode);
+ setQueueSession(true);
+ }
+
+ public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
+ {
+ return createReceiver(queue, null);
+ }
+
+ public QueueReceiverImpl createReceiver(final Queue queue, final String selector) throws JMSException
+ {
+ // TODO - assert queue is a queueimpl and throw relevant JMS Exception
+ final QueueReceiverImpl messageConsumer;
+ synchronized(getClientSession().getEndpoint().getLock())
+ {
+ messageConsumer = new QueueReceiverImpl((QueueImpl)queue, this, selector, false);
+ addConsumer(messageConsumer);
+ }
+ return messageConsumer;
+
+ }
+
+ public QueueSenderImpl createSender(final Queue queue) throws JMSException
+ {
+ return null; //TODO
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org