You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flex.apache.org by cd...@apache.org on 2017/04/16 22:32:00 UTC
[09/72] [abbrv] [partial] flex-blazeds git commit: - Major code scrub
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java b/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java
deleted file mode 100644
index d04539f..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSAdapter.java
+++ /dev/null
@@ -1,1114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.jms.JMSException;
-import javax.naming.Context;
-
-import flex.management.runtime.messaging.services.messaging.adapters.JMSAdapterControl;
-import flex.messaging.Destination;
-import flex.messaging.MessageClient;
-import flex.messaging.MessageClientListener;
-import flex.messaging.MessageDestination;
-import flex.messaging.MessageException;
-import flex.messaging.config.ConfigMap;
-import flex.messaging.config.ConfigurationException;
-import flex.messaging.log.Log;
-import flex.messaging.log.LogCategories;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.messages.ErrorMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.messages.MessagePerformanceInfo;
-import flex.messaging.messages.MessagePerformanceUtils;
-import flex.messaging.services.MessageService;
-import flex.messaging.services.messaging.adapters.JMSSettings.DeliverySettings;
-
-/**
- * This adapter for the MessageService integrates Flex messaging
- * with Java Message Service destinations.
- */
-public class JMSAdapter extends MessagingAdapter implements JMSConfigConstants, JMSExceptionListener, JMSMessageListener, MessageClientListener
-{
- public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE_JMS;
- private static final String DURABLE_SUBSCRIBER_NAME_PREFIX = "FlexClient_";
-
- // Note that clientId is kept track as Object (instead of String) in all of
- // these data structures because in clustering, clientId is not a String,
- // it's an instance of org.jgroups.stack.IpAddress instead.
- private Map<JMSConsumer, Object> consumerToClientId;
- private Map<Object, MessageClient> messageClients;
- private LinkedList<JMSProducer> topicProducers;
- private Map<Object, JMSConsumer> topicConsumers;
- private LinkedList<JMSProducer> queueProducers;
- private Map<Object, JMSConsumer> queueConsumers;
-
- // JMSAdapter properties
- private JMSSettings settings;
- private JMSAdapterControl controller;
-
- //--------------------------------------------------------------------------
- //
- // Constructor
- //
- //--------------------------------------------------------------------------
-
- /**
- * Constructs an unmanaged <code>JMSAdapter</code> instance.
- */
- public JMSAdapter()
- {
- this(false);
- }
-
- /**
- * Constructs a <code>JMSAdapter</code> instance.
- *
- * @param enableManagement <code>true</code> if the <code>JMSAdapter</code>
- * has a corresponding MBean control for management; otherwise <code>false</code>.
- */
- public JMSAdapter(boolean enableManagement)
- {
- super(enableManagement);
- consumerToClientId = new ConcurrentHashMap<JMSConsumer, Object>();
- messageClients = new ConcurrentHashMap<Object, MessageClient>();
- topicProducers = new LinkedList<JMSProducer>();
- topicConsumers = new ConcurrentHashMap<Object, JMSConsumer>();
- queueProducers = new LinkedList<JMSProducer>();
- queueConsumers = new ConcurrentHashMap<Object, JMSConsumer>();
- settings = new JMSSettings();
- }
-
- //--------------------------------------------------------------------------
- //
- // Initialize, validate, start, and stop methods.
- //
- //--------------------------------------------------------------------------
-
- /**
- * Initializes the <code>JMSAdapter</code> with the properties.
- *
- * @param id The id of the <code>JMSAdapter</code>.
- * @param properties Properties for the <code>JMSAdapter</code>.
- */
- @Override
- public void initialize(String id, ConfigMap properties)
- {
- super.initialize(id, properties);
-
- if (properties == null || properties.size() == 0)
- return;
-
- // JMS specific properties
- jms(properties);
- }
-
- /**
- * Verifies that the <code>JMSAdapter</code> is in valid state before
- * it is started.
- */
- @Override
- protected void validate()
- {
- if (isValid())
- return;
-
- super.validate();
-
- if (settings.getConnectionFactory() == null)
- {
- // JMS connection factory of message destinations with JMS Adapters must be specified.
- ConfigurationException ce = new ConfigurationException();
- ce.setMessage(MISSING_CONNECTION_FACTORY);
- throw ce;
- }
-
- if (settings.getDestinationJNDIName() == null)
- {
- // JNDI names for message destinations with JMS Adapters must be specified.
- ConfigurationException ce = new ConfigurationException();
- ce.setMessage(JMSConfigConstants.MISSING_DESTINATION_JNDI_NAME);
- throw ce;
- }
-
- if (settings.getMessageType() == null)
- {
- // Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage, javax.jms.ObjectMessage, and javax.jms.MapMessage.
- ConfigurationException ce = new ConfigurationException();
- ce.setMessage(INVALID_JMS_MESSAGE_TYPE, new Object[] {null});
- throw ce;
- }
- }
-
- /**
- * Starts the adapter.
- */
- @Override
- public void start()
- {
- if (isStarted())
- return;
-
- super.start();
-
- // Add JMS adapter as a MessageClient created listener so that its
- // JMS consumers can be associated with their message clients.
- MessageClient.addMessageClientCreatedListener(this);
- }
-
- /**
- * Stops the adapter.
- */
- @Override
- public void stop()
- {
- if (!isStarted())
- return;
-
- super.stop();
-
- stopConsumers(topicConsumers.values());
- stopConsumers(queueConsumers.values());
- }
-
- //--------------------------------------------------------------------------
- //
- // Public methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Casts the <code>Destination</code> into <code>MessageDestination</code>
- * and calls super.setDestination.
- *
- * @param destination The destination of the adapter.
- */
- @Override
- public void setDestination(Destination destination)
- {
- MessageDestination dest = (MessageDestination)destination;
- super.setDestination(dest);
- }
-
- /**
- * Gets the <code>JMSSettings</code> of the <code>JMSAdapter</code>.
- *
- * @return <code>JMSSettings</code> of the <code>JMSAdapter</code>.
- */
- public JMSSettings getJMSSettings()
- {
- return settings;
- }
-
- /**
- * Sets the <code>JMSSettings</code> of the <code>JMSAdapter</code>.
- *
- * @param jmsSettings <code>JMSSettings</code> of the <code>JMSAdapter</code>.
- */
- public void setJMSSettings(JMSSettings jmsSettings)
- {
- this.settings = jmsSettings;
- }
-
- /**
- * Returns the count of queue consumers managed by this adapter.
- *
- * @return The count of queue consumers managed by this adapter.
- */
- public int getQueueConsumerCount()
- {
- return queueConsumers.size();
- }
-
- /**
- * Returns the ids of all queue consumers.
- *
- * @return The ids of all queue consumers.
- */
- public String[] getQueueConsumerIds()
- {
- Set<Object> consumerIds = queueConsumers.keySet();
- if (consumerIds != null)
- {
- String[] ids = new String[consumerIds.size()];
- return consumerIds.toArray(ids);
- }
- return new String[0];
- }
-
- /**
- * Returns the count of topic consumers currently managed by this adapter.
- *
- * @return The count of topic consumers currently managed by this adapter.
- */
- public int getTopicConsumerCount()
- {
- return topicConsumers.size();
- }
-
- /**
- * Returns the ids of all topic consumers.
- *
- * @return The ids of all topic consumers.
- */
- public String[] getTopicConsumerIds()
- {
- Set<Object> consumerIds = topicConsumers.keySet();
- if (consumerIds != null)
- {
- String[] ids = new String[consumerIds.size()];
- return consumerIds.toArray(ids);
- }
- return new String[0];
- }
-
- /**
- * Returns the count of topic producers currently managed by this adapter.
- *
- * @return The count of topic producers currently managed by this adapter.
- */
- public int getTopicProducerCount()
- {
- return topicProducers.size();
- }
-
- /**
- * Returns the count of queue producers currently managed by this adapter.
- *
- * @return The count of queue producers currently managed by this adapter.
- */
- public int getQueueProducerCount()
- {
- return queueProducers.size();
- }
-
- /**
- * Implements JMSExceptionListener.
- * When a JMSConsumer receives a JMS exception from its underlying JMS
- * connection, it dispatches a JMS exception event to pass the exception
- * to the JMS adapter.
- *
- * @param evt The <code>JMSExceptionEvent</code>.
- */
- public void exceptionThrown(JMSExceptionEvent evt)
- {
- JMSConsumer consumer = (JMSConsumer)evt.getSource();
- JMSException jmsEx = evt.getJMSException();
-
- // Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' encountered an error during message delivery: {1}
- MessageException messageEx = new MessageException();
- messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_MESSAGE_DELIVERY_ERROR, new Object[] {consumer.getDestinationJndiName(), jmsEx.getMessage()});
- removeConsumer(consumer, true, true, messageEx.createErrorMessage());
- }
-
- /**
- * JMS adapter handles its subscriptions so this returns <code>true</code>.
- *
- * @return <code>true</code>.
- */
- @Override
- public boolean handlesSubscriptions()
- {
- return true;
- }
-
- /**
- * Publish a message to this adapter's JMS destination.
- *
- * @param message The Flex message to publish.
- * @return The body of the acknowledge message which is null is this case.
- */
- @SuppressWarnings("unchecked")
- @Override
- public Object invoke(Message message)
- {
- JMSProducer producer = null;
-
- // named Flex message props become JMS headers
- Map msgProps = message.getHeaders();
- msgProps.put(JMSConfigConstants.TIME_TO_LIVE, new Long(message.getTimeToLive()));
-
- if (settings.getDestinationType().equals(TOPIC))
- {
- synchronized (topicProducers)
- {
- if (topicProducers.size() < settings.getMaxProducers())
- {
- producer = new JMSTopicProducer();
- try
- {
- producer.initialize(settings);
- producer.start();
- }
- catch (Exception e)
- {
- throw constructMessageException(e);
- }
- }
- else
- {
- producer = topicProducers.removeFirst();
- }
-
- topicProducers.addLast(producer);
- }
- }
- else if (settings.getDestinationType().equals(QUEUE))
- {
- synchronized (queueProducers)
- {
- if (queueProducers.size() < settings.getMaxProducers())
- {
- producer = new JMSQueueProducer();
- try
- {
- producer.initialize(settings);
- producer.start();
- }
- catch (Exception e)
- {
- throw constructMessageException(e);
- }
- }
- else
- {
- producer = queueProducers.removeFirst();
- }
-
- queueProducers.addLast(producer);
- }
- }
-
- try
- {
- if (producer != null)
- producer.sendMessage(message);
- }
- catch (JMSException jmsEx)
- {
- // At this point we give up on this producer, so we just
- // stop and remove it from the pool.
- if (settings.getDestinationType().equals(TOPIC))
- {
- synchronized (topicProducers)
- {
- if (producer != null)
- {
- producer.stop();
- topicProducers.remove(producer);
- }
- }
- }
- else if (settings.getDestinationType().equals(QUEUE))
- {
- synchronized (queueProducers)
- {
- if (producer != null)
- {
- producer.stop();
- queueProducers.remove(producer);
- }
- }
- }
-
- throw constructMessageException(jmsEx);
- }
-
- return null;
- }
-
- /**
- * Handle a CommandMessage sent by this adapter's service.
- *
- * @param commandMessage The command message to manage.
- * @return The result of manage which is null in this case.
- */
- @Override
- public Object manage(CommandMessage commandMessage)
- {
- JMSConsumer consumer = null;
- Object clientId = commandMessage.getClientId();
-
- if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION)
- {
- // Keep track of the selector expression.
- Object selectorExpression = commandMessage.getHeaders().get(CommandMessage.SELECTOR_HEADER);
-
- // Create a JMSConsumer for this destination and associate it with the client id
- if (settings.getDestinationType().equals(TOPIC))
- {
- MessageClient existingMessageClient = null;
- // This could happen when client disconnects without unsubscribing first.
- if (topicConsumers.containsKey(clientId))
- {
- removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null);
- existingMessageClient = messageClients.get(clientId);
- }
- // Create the consumer.
- consumer = new JMSTopicConsumer();
- consumer.initialize(settings);
- if (selectorExpression != null)
- consumer.setSelectorExpression((String)selectorExpression);
- // Need to build a subscription name, in case durable subscriptions are used.
- ((JMSTopicConsumer)consumer).setDurableSubscriptionName(buildSubscriptionName(clientId));
- consumer.setMessageReceiver(buildMessageReceiver(consumer));
-
- // Add JMSAdapter as JMS exception and message listener.
- consumer.addJMSExceptionListener(this);
- consumer.addJMSMessageListener(this);
- topicConsumers.put(clientId, consumer);
- consumerToClientId.put(consumer, clientId);
-
- // Means client was disconnected without unsubscribing, hence no
- // new message client will be created. Make sure the old one is
- // wired up with the new JMS consumer properly.
- if (existingMessageClient != null)
- messageClientCreated(existingMessageClient);
- }
- else if (settings.getDestinationType().equals(QUEUE))
- {
- MessageClient existingMessageClient = null;
- if (queueConsumers.containsKey(clientId))
- {
- removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null);
- existingMessageClient = messageClients.get(clientId);
- }
- // Create the consumer.
- consumer = new JMSQueueConsumer();
- consumer.initialize(settings);
- if (selectorExpression != null)
- consumer.setSelectorExpression((String)selectorExpression);
- consumer.setMessageReceiver(buildMessageReceiver(consumer));
-
- // Add JMSAdapter as JMS exception and message listener.
- consumer.addJMSExceptionListener(this);
- consumer.addJMSMessageListener(this);
- queueConsumers.put(clientId, consumer);
- consumerToClientId.put(consumer, clientId);
-
- // Means client was disconnected without unsubscribing, hence no
- // new message client will be created. Make sure the old one is
- // wired up with the new JMS consumer properly.
- if (existingMessageClient != null)
- messageClientCreated(existingMessageClient);
- }
- }
-
- else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION)
- {
- // Determines if the durable subscription should be unsubscribed
- // when the JMS consumer is removed.
- boolean unsubscribe = true;
-
- boolean preserveDurable = false;
- if (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER) != null)
- preserveDurable = ((Boolean)(commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER))).booleanValue();
-
- // Don't destroy a durable subscription if the MessageClient's session has been invalidated.
- // or this is a JMS durable connection that has requested to be undestroyed
- if (commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) != null
- && ((Boolean)commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER)).booleanValue()
- || preserveDurable)
- unsubscribe = false;
-
- removeConsumer(clientId, unsubscribe, false, null);
- }
-
- // CommandMessage.POLL_OPERATION handling is left to the Endpoint
- // hence not handled by this adapter.
-
- return null;
- }
-
- /**
- * Implements MessageClientListener.
- * When a MessageClient is created, JMSAdapter looks up its JMSConsumers
- * and if there is a JMSConsumer that has the same clientId as MessageClient,
- * it adds the MessageClient to its list clients. This helps in keeping both
- * sides of the bridge (MessageClient and JMSConsumer) notified when there's
- * a failure on either side of the bridge.
- *
- * @param messageClient The newly created MessageClient.
- */
- public void messageClientCreated(MessageClient messageClient)
- {
- Object clientId = messageClient.getClientId();
- JMSConsumer consumer = null;
- if (topicConsumers.containsKey(clientId))
- consumer = topicConsumers.get(clientId);
- else if (queueConsumers.containsKey(clientId))
- consumer = queueConsumers.get(clientId);
-
- // If there is a JMSConsumer created for the same clientId, register
- // the MessageClient with JMSAdapter and start the consumer.
- if (consumer != null)
- {
- messageClients.put(clientId, messageClient);
- try
- {
- consumer.start();
- // Add JMS adapter as a client destroyed listener, so client
- // invalidation (eg. due to session timeout) can be handled properly.
- messageClient.addMessageClientDestroyedListener(this);
- }
- catch (MessageException messageEx)
- {
- removeConsumer(consumer, true, true, messageEx.createErrorMessage());
- }
- catch (Exception ex)
- {
- removeConsumer(consumer, true, true, constructMessageException(ex).createErrorMessage());
- }
- }
- }
-
- /**
- * Implements MessageClientListener.
- * When a MessageClient is destroyed (usually due to session timeout), its
- * corresponding JMS consumer is removed. Note that this might have already
- * happened if the client first unsubscribes and in that case, this is a no-op.
- *
- * @param messageClient The MessageClient that was destroyed.
- */
- public void messageClientDestroyed(MessageClient messageClient)
- {
- Object clientId = messageClient.getClientId();
- removeConsumer(clientId);
- messageClients.remove(clientId);
- }
-
- /**
- * Implements JMSMessageListener.
- * When a JMSConsumer receives a JMS message, it dispatched a JMS message
- * event to pass the message to the JMS adapter.
- *
- * @param evt The <code>JMSMessageEvent</code>.
- */
- public void messageReceived(JMSMessageEvent evt)
- {
- JMSConsumer consumer = (JMSConsumer)evt.getSource();
- javax.jms.Message jmsMessage = evt.getJMSMessage();
-
- flex.messaging.messages.AsyncMessage flexMessage = convertToFlexMessage(jmsMessage, consumer);
- if (flexMessage != null)
- {
- MessagePerformanceUtils.markServerPostAdapterExternalTime(flexMessage);
- ((MessageService)getDestination().getService()).serviceMessageFromAdapter(flexMessage, false);
- }
- }
-
- /**
- * Removes (unsubscribes) the specified consumer. By default, it removes
- * the durable subscription and pushes a generic error message to the client
- * before MessageClient invalidation.
- *
- * @param clientId The identifier for the consumer to remove.
- */
- public void removeConsumer(Object clientId)
- {
- // Client is unsubscribed because its corresponding JMS consumer has been removed from the JMS adapter.
- MessageException messageEx = new MessageException();
- messageEx.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_REMOVAL);
- removeConsumer(clientId, true, true, messageEx.createErrorMessage());
- }
-
- //--------------------------------------------------------------------------
- //
- // Protected and Private methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Removes (unsubscribes) the JMSConsumer associated with the clientId.
- *
- * @param clientId The clientId associated with the JMSConsumer to remove.
- * @param unsubscribe Whether to unsubscribe the durable subscription or not.
- * @param invalidate Whether to invalidate the MessageClient or not.
- * @param invalidateMessage A message to push to the client before consumer
- * is removed and its MessageClient is invalidated. If the message is null,
- * MessageClient is invalidated silently.
- */
- protected void removeConsumer(Object clientId, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage)
- {
- JMSConsumer consumer = null;
- if (topicConsumers.containsKey(clientId))
- consumer = topicConsumers.get(clientId);
- else if (queueConsumers.containsKey(clientId))
- consumer = queueConsumers.get(clientId);
-
- removeConsumer(consumer, unsubscribe, invalidate, invalidateMessage);
- }
-
- /**
- * Removes (unsubscribes) the specified consumer.
- *
- * @param consumer The JMSConsumer instance to remove.
- * @param unsubscribe Whether to unsubscribe the durable subscription or not.
- * @param invalidate Whether to invalidate the MessageClient or not.
- * @param invalidateMessage A message to push to the client before consumer
- * is removed and its MessageClient is invalidated. If the message is null,
- * MessageClient is invalidated silently.
- */
- protected void removeConsumer(JMSConsumer consumer, boolean unsubscribe, boolean invalidate, ErrorMessage invalidateMessage)
- {
- if (consumer == null)
- return;
-
- Object clientId = consumerToClientId.get(consumer);
- if (clientId == null)
- return;
-
- if (Log.isInfo())
- {
- String logMessage = "JMS consumer for JMS destination '" + consumer.getDestinationJndiName()
- + "' is being removed from the JMS adapter";
-
- if (invalidateMessage != null)
- logMessage += " due to the following error: " + invalidateMessage.faultString;
-
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info(logMessage);
- }
-
- consumer.removeJMSExceptionListener(this);
- consumer.removeJMSMessageListener(this);
- consumer.stop(unsubscribe);
- if (invalidate)
- invalidateMessageClient(consumer, invalidateMessage);
- if (consumer instanceof JMSTopicConsumer)
- topicConsumers.remove(clientId);
- else // assuming JMSQueueConsumer.
- queueConsumers.remove(clientId);
- consumerToClientId.remove(consumer);
- // Message client will be removed in messageClientDestroyed.
- }
-
- /**
- * Invoked automatically to allow the <code>JMSAdapter</code> to setup its corresponding
- * MBean control.
- *
- * @param destination The <code>Destination</code> that manages this <code>JMSAdapter</code>.
- */
- @Override
- protected void setupAdapterControl(Destination destination)
- {
- controller = new JMSAdapterControl(this, destination.getControl());
- controller.register();
- setControl(controller);
- }
-
- /**
- * Builds a MessageReceiver for JMSConsumer from DeliverySettings.
- *
- * @param consumer The <code>JMSConsumer</code>.
- * @return MessageReceiver configured for JMSConsumer per DeliverySettings.
- */
- private MessageReceiver buildMessageReceiver(JMSConsumer consumer)
- {
- DeliverySettings deliverySettings = settings.getDeliverySettings();
- if (deliverySettings.getMode().equals(JMSConfigConstants.ASYNC))
- return new AsyncMessageReceiver(consumer);
- SyncMessageReceiver syncMessageReceiver = new SyncMessageReceiver(consumer);
- syncMessageReceiver.setSyncReceiveIntervalMillis(deliverySettings.getSyncReceiveIntervalMillis());
- syncMessageReceiver.setSyncReceiveWaitMillis(deliverySettings.getSyncReceiveWaitMillis());
- return syncMessageReceiver;
- }
-
- /**
- * Prefixes a clientId with DURABLE_SUBSCRIBER_NAME_PREFIX to build a
- * subscription name to be used in JMSConsumers with durable connections.
- */
- private String buildSubscriptionName(Object clientId)
- {
- return DURABLE_SUBSCRIBER_NAME_PREFIX + clientId.toString();
- }
-
- /**
- * Construct a MessageException for the JMS invocation
- * @param e the Exception caught in the JMS invocation
- * @return MessageException encapsulates the JMS Exception message
- */
- private MessageException constructMessageException(Exception e)
- {
- MessageException messageEx = new MessageException();
- messageEx.setMessage(JMSINVOCATION_EXCEPTION, new Object[] { e.getMessage() });
- return messageEx;
- }
-
- /**
- * Convert from a <code>javax.jms.Message</code> type to the <code>flex.messaging.messages.AsyncMessage</code> type.
- * Supported types are <code>javax.jms.TextMessage</code>, <code>javax.jms.ObjectMessage</code>,
- * and <code>javax.jms.MapMessage</code>.
- */
- private flex.messaging.messages.AsyncMessage convertToFlexMessage(javax.jms.Message jmsMessage, JMSConsumer consumer)
- {
- flex.messaging.messages.AsyncMessage flexMessage = null;
- flexMessage = new flex.messaging.messages.AsyncMessage();
-
- Object clientId = consumerToClientId.get(consumer);
- if (clientId == null)
- {
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered a null clientId during JMS to Flex message conversion");
-
- return null;
- }
- flexMessage.setClientId(clientId);
-
-
- flexMessage.setDestination(getDestination().getId());
-
- // Set JMSMessageID header as Flex messageId property.
- try
- {
- flexMessage.setMessageId(jmsMessage.getJMSMessageID());
- }
- catch (JMSException jmsEx)
- {
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message id during JMS to Flex message conversion: " + jmsEx.getMessage());
- }
-
- // Set JMSTimestamp header as Flex timestamp property.
- try
- {
- flexMessage.setTimestamp(jmsMessage.getJMSTimestamp());
- }
- catch (JMSException jmsEx)
- {
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS timestamp during JMS to Flex message conversion: " + jmsEx.getMessage());
- }
-
- // Set JMS headers and Flex headers.
- if (settings.isPreserveJMSHeaders())
- {
- // Set standard JMS headers except JMSMessageId and JMSTimestamp,
- // as they are already set on the Flex message directly.
- try
- {
- flexMessage.setHeader(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID());
- flexMessage.setHeader(JMS_DELIVERY_MODE, Integer.toString(jmsMessage.getJMSDeliveryMode()));
- flexMessage.setHeader(JMS_DESTINATION, jmsMessage.getJMSDestination().toString());
- flexMessage.setHeader(JMS_EXPIRATION, Long.toString(jmsMessage.getJMSExpiration()));
- flexMessage.setHeader(JMS_PRIORITY, Integer.toString(jmsMessage.getJMSPriority()));
- flexMessage.setHeader(JMS_REDELIVERED, Boolean.toString(jmsMessage.getJMSRedelivered()));
- flexMessage.setHeader(JMS_REPLY_TO, jmsMessage.getJMSReplyTo());
- flexMessage.setHeader(JMS_TYPE, jmsMessage.getJMSType());
- }
- catch (JMSException jmsEx)
- {
- // These should not cause errors to be pushed to Flash clients
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS headers during JMS to Flex conversion: " + jmsEx.getMessage());
-
- }
- }
-
- // Set JMS properties as Flex headers.
-
- // While iterating through JMS message properties, build a message
- // performance object to send back to the client with the message
- // properties starting with with MPI_HEADER_IN (if any).
- MessagePerformanceInfo mpi = null;
-
- try
- {
- for (Enumeration propEnum = jmsMessage.getPropertyNames(); propEnum.hasMoreElements();)
- {
- String propName = (String)propEnum.nextElement();
- try
- {
- Object propValue = jmsMessage.getObjectProperty(propName);
- if (propName.startsWith(MessagePerformanceUtils.MPI_HEADER_IN))
- {
- if (mpi == null)
- mpi = new MessagePerformanceInfo();
- propName = propName.substring(MessagePerformanceUtils.MPI_HEADER_IN.length());
- java.lang.reflect.Field field;
- try
- {
- field = mpi.getClass().getField(propName);
- field.set(mpi, propValue);
- }
- catch (Exception ignore)
- {
- // Simply don't set the property if the value cannot be retrieved.
- }
- }
- else
- {
- flexMessage.setHeader(propName, propValue);
- }
- }
- catch (JMSException jmsEx)
- {
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage());
- }
- }
-
- if (mpi != null)
- flexMessage.setHeader(MessagePerformanceUtils.MPI_HEADER_IN, mpi);
- }
- catch (JMSException jmsEx)
- {
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS properties during JMS to Flex conversion: " + jmsEx.getMessage());
- }
-
- // Finally, set the JMS message body of the Flex message body.
- try
- {
- if (jmsMessage instanceof javax.jms.TextMessage)
- {
- javax.jms.TextMessage textMessage = (javax.jms.TextMessage)jmsMessage;
- flexMessage.setBody(textMessage.getText());
- }
- else if (jmsMessage instanceof javax.jms.ObjectMessage)
- {
- javax.jms.ObjectMessage objMessage = (javax.jms.ObjectMessage)jmsMessage;
- flexMessage.setBody(objMessage.getObject());
- }
- else if (jmsMessage instanceof javax.jms.MapMessage)
- {
- javax.jms.MapMessage mapMessage = (javax.jms.MapMessage)jmsMessage;
- @SuppressWarnings("unchecked")
- Enumeration names = mapMessage.getMapNames();
- Map<String, Object> body = new HashMap<String, Object>();
- while (names.hasMoreElements())
- {
- String name = (String)names.nextElement();
- body.put(name, mapMessage.getObject(name));
- }
- flexMessage.setBody(body);
- }
- }
- catch (JMSException jmsEx)
- {
- if (Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("JMSAdapter encountered an error while retrieving JMS message body during JMS to Flex conversion: " + jmsEx.getMessage());
- }
-
- return flexMessage;
- }
-
- /**
- * Invalidates the MessageClient associated with the JMSConsumer with the
- * supplied error message.
- *
- * @param consumer The JMSConsumer whose MessageClient will be invalidated.
- * @param message The error message to push out before invalidating the
- * MessageClient. If the message is null, MessageClient is invalidated
- * silently.
- */
- private void invalidateMessageClient(JMSConsumer consumer, flex.messaging.messages.Message message)
- {
- Object clientId = consumerToClientId.get(consumer);
- if (clientId != null && messageClients.containsKey(clientId))
- {
- MessageClient messageClient = messageClients.get(clientId);
-
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("The corresponding MessageClient for JMS consumer for JMS destination '"
- + consumer.getDestinationJndiName() + "' is being invalidated");
-
- messageClient.invalidate(message);
- }
- }
-
- /**
- * Handle JMS specific configuration.
- */
- private void jms(ConfigMap properties)
- {
- ConfigMap jms = properties.getPropertyAsMap(JMS, null);
- if (jms != null)
- {
- String destType = jms.getPropertyAsString(DESTINATION_TYPE, defaultDestinationType);
- settings.setDestinationType(destType);
-
- String msgType = jms.getPropertyAsString(MESSAGE_TYPE, null);
- settings.setMessageType(msgType);
-
- String factory = jms.getPropertyAsString(CONNECTION_FACTORY, null);
- settings.setConnectionFactory(factory);
-
- ConfigMap connectionCredentials = jms.getPropertyAsMap(CONNECTION_CREDENTIALS, null);
- if (connectionCredentials != null)
- {
- String username = connectionCredentials.getPropertyAsString(USERNAME, null);
- settings.setConnectionUsername(username);
- String password = connectionCredentials.getPropertyAsString(PASSWORD, null);
- settings.setConnectionPassword(password);
- }
-
- ConfigMap deliverySettings = jms.getPropertyAsMap(DELIVERY_SETTINGS, null);
- if (deliverySettings != null)
- {
- // Get the default delivery settings.
- DeliverySettings ds = settings.getDeliverySettings();
-
- String mode = deliverySettings.getPropertyAsString(MODE, JMSConfigConstants.defaultMode);
- ds.setMode(mode);
-
- long receiveIntervalMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_INTERVAL_MILLIS, defaultSyncReceiveIntervalMillis);
- ds.setSyncReceiveIntervalMillis(receiveIntervalMillis);
-
- long receiveWaitMillis = deliverySettings.getPropertyAsLong(SYNC_RECEIVE_WAIT_MILLIS, defaultSyncReceiveWaitMillis);
- ds.setSyncReceiveWaitMillis(receiveWaitMillis);
- }
-
- String destJNDI = jms.getPropertyAsString(DESTINATION_JNDI_NAME, null);
- settings.setDestinationJNDIName(destJNDI);
-
- String dest = jms.getPropertyAsString(DESTINATION_NAME, null);
- if (dest != null && Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("The <destination-name> configuration option is deprecated and non-functional. Please remove this from your configuration file.");
-
- boolean durable = getDestination() instanceof MessageDestination ?
- ((MessageDestination) getDestination()).getServerSettings().isDurable() : false;
- settings.setDurableConsumers(durable);
-
- String deliveryMode = jms.getPropertyAsString(DELIVERY_MODE, null);
- settings.setDeliveryMode(deliveryMode);
-
- boolean preserveJMSHeaders = jms.getPropertyAsBoolean(PRESERVE_JMS_HEADERS, settings.isPreserveJMSHeaders());
- settings.setPreserveJMSHeaders(preserveJMSHeaders);
-
- String defPriority = jms.getPropertyAsString(MESSAGE_PRIORITY, null);
- if (defPriority != null && !defPriority.equalsIgnoreCase(DEFAULT_PRIORITY))
- {
- int priority = jms.getPropertyAsInt(MESSAGE_PRIORITY, settings.getMessagePriority());
- settings.setMessagePriority(priority);
- }
-
- String ackMode = jms.getPropertyAsString(ACKNOWLEDGE_MODE, defaultAcknowledgeMode);
- settings.setAcknowledgeMode(ackMode);
-
- boolean transMode = jms.getPropertyAsBoolean(TRANSACTION_MODE, false);
- if (transMode && Log.isWarn())
- Log.getLogger(LOG_CATEGORY).warn("The <transacted-sessions> configuration option is deprecated and non-functional. Please remove this from your configuration file.");
-
- int maxProducers = jms.getPropertyAsInt(MAX_PRODUCERS, defaultMaxProducers);
- settings.setMaxProducers(maxProducers);
-
- // Retrieve any JNDI initial context environment properties.
- ConfigMap env = jms.getPropertyAsMap(INITIAL_CONTEXT_ENVIRONMENT, null);
- if (env != null)
- {
- List props = env.getPropertyAsList(PROPERTY, null);
- if (props != null)
- {
- Class contextClass = Context.class;
- Hashtable envProps = new Hashtable();
- for (Iterator iter = props.iterator(); iter.hasNext();)
- {
- Object prop = iter.next();
- if (prop instanceof ConfigMap)
- {
- ConfigMap pair = (ConfigMap)prop;
- String name = pair.getProperty(NAME);
- String value = pair.getProperty(VALUE);
- if (name == null || value == null)
- {
- // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements.
- MessageException messageEx = new MessageException();
- messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[] {getDestination().getId()});
- throw messageEx;
- }
- // If the name is a Context field, use the
- // constant value rather than this literal name.
- if (name.startsWith("Context."))
- {
- String fieldName = name.substring(name.indexOf('.') + 1);
- java.lang.reflect.Field field = null;
- try
- {
- field = contextClass.getDeclaredField(fieldName);
- }
- catch (NoSuchFieldException nsfe)
- {
- // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an invalid javax.naming.Context field for its <name>: {1}
- MessageException messageEx = new MessageException();
- messageEx.setMessage(INVALID_CONTEXT_NAME, new Object[] {getDestination().getId(), fieldName});
- throw messageEx;
- }
- String fieldValue = null;
- try
- {
- fieldValue = (String)field.get(null);
- }
- catch (IllegalAccessException iae)
- {
- // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination specifies an inaccessible javax.naming.Context field for its <name>: {1}
- MessageException messageEx = new MessageException();
- messageEx.setMessage(INACCESIBLE_CONTEXT_NAME, new Object[] {getDestination().getId(), fieldName});
- throw messageEx;
- }
- envProps.put(fieldValue, value);
- }
- else
- {
- envProps.put(name, value);
- }
- }
- else
- {
- // A <property> element for the <initial-context-environment> settings for the ''{0}'' destination does not specify both <name> and <value> subelements.
- MessageException messageEx = new MessageException();
- messageEx.setMessage(MISSING_NAME_OR_VALUE, new Object[] {getDestination().getId()});
- throw messageEx;
- }
- }
- settings.setInitialContextEnvironment(envProps);
- }
- else
- {
- // The <initial-context-environment> settings for the ''{0}'' destination does not include any <property> subelements.
- MessageException messageEx = new MessageException();
- messageEx.setMessage(MISSING_PROPERTY_SUBELEMENT, new Object[] {getDestination().getId()});
- throw messageEx;
- }
- }
- }
- }
-
- private void stopConsumers(Collection<JMSConsumer> consumers)
- {
- Iterator<JMSConsumer> itr = consumers.iterator();
- while (itr.hasNext())
- {
- JMSConsumer consumer = itr.next();
- // Client is unsubscribed because its corresponding JMS consumer for JMS destination ''{0}'' has been stopped.
- MessageException me = new MessageException();
- me.setMessage(JMSConfigConstants.CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_STOP, new Object[] {consumer.getDestinationJndiName()});
- consumer.stop(true);
- invalidateMessageClient(consumer, me.createErrorMessage());
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java b/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java
deleted file mode 100644
index e792eb6..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSConfigConstants.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-/**
- * Constants for JMSAdapter and its related classes.
- *
- *
- */
-public interface JMSConfigConstants
-{
- // Values used in the messaging configuration
- String ASYNC = "async";
- String ACKNOWLEDGE_MODE = "acknowledge-mode";
- String AUTO_ACKNOWLEDGE = "auto_acknowledge";
- String CLIENT_ACKNOWLEDGE = "client_acknowledge";
- String CONNECTION_FACTORY = "connection-factory";
- String CONNECTION_CREDENTIALS = "connection-credentials";
- String DELIVERY_SETTINGS = "delivery-settings";
- String DEFAULT_DELIVERY_MODE = "default_delivery_mode";
- String DEFAULT_PRIORITY = "default-priority";
- String DELIVERY_MODE = "delivery-mode";
- String DESTINATION_JNDI_NAME = "destination-jndi-name";
- String DESTINATION_NAME = "destination-name";
- String DESTINATION_TYPE = "destination-type";
- String DUPS_OK_ACKNOWLEDGE = "dups_ok_acknowledge";
- String INITIAL_CONTEXT_ENVIRONMENT = "initial-context-environment";
- String JMS = "jms";
- String MAX_PRODUCERS = "max-producers";
- String MAP_MESSAGE = "javax.jms.MapMessage";
- String MESSAGE_TYPE = "message-type";
- String MESSAGE_PRIORITY = "message-priority";
- String MODE = "mode";
- String NAME = "name";
- String NON_PERSISTENT = "non_persistent";
- String OBJECT_MESSAGE = "javax.jms.ObjectMessage";
- String PASSWORD = "password";
- String PERSISTENT = "persistent";
- String PRESERVE_JMS_HEADERS = "preserve-jms-headers";
- String PROPERTY = "property";
- String SYNC = "sync";
- String SYNC_RECEIVE_INTERVAL_MILLIS = "sync-receive-interval-millis";
- String SYNC_RECEIVE_WAIT_MILLIS = "sync-receive-wait-millis";
- String QUEUE = "queue";
- String TEXT_MESSAGE = "javax.jms.TextMessage";
- String TOPIC = "topic";
- String TRANSACTION_MODE = "transacted-sessions";
- String USERNAME = "username";
- String VALUE = "value";
-
- // Flex message properties to translate to JMS
- String TIME_TO_LIVE = "timeToLive";
-
- // Standard JMS headers to translate to Flex
- String JMS_CORRELATION_ID = "JMSCorrelationID";
- String JMS_DELIVERY_MODE = "JMSDeliveryMode";
- String JMS_DESTINATION = "JMSDestination";
- String JMS_EXPIRATION = "JMSExpiration";
- // public static final String JMS_MESSAGE_ID = "JMSMessageID";
- String JMS_PRIORITY = "JMSPriority";
- String JMS_REDELIVERED = "JMSRedelivered";
- String JMS_REPLY_TO = "JMSReplyTo";
- // public static final String JMS_TIMESTAMP = "JMSTimestamp";
- String JMS_TYPE = "JMSType";
-
- // Defaults
- String defaultAcknowledgeMode = AUTO_ACKNOWLEDGE;
- String defaultDestinationType = TOPIC;
- boolean defaultPreserveJMSHeaders = true;
- long defaultSyncReceiveIntervalMillis = 100;
- long defaultSyncReceiveWaitMillis = 0;
- int defaultMaxProducers = 1;
- String defaultMode = SYNC;
-
- // Errors
- int MISSING_NAME_OR_VALUE = 10800;
- int INVALID_CONTEXT_NAME = 10801;
- int INACCESIBLE_CONTEXT_NAME = 10802;
- int MISSING_PROPERTY_SUBELEMENT = 10803;
- int MISSING_CONNECTION_FACTORY = 10804;
- int INVALID_DESTINATION_TYPE = 10805;
- // int CLIENT_NOT_SUBSCRIBED = 10806;
- int MISSING_DESTINATION_JNDI_NAME = 10807;
- int INVALID_ACKNOWLEDGE_MODE = 10808;
- int INVALID_DELIVERY_MODE = 10809;
- int NONSERIALIZABLE_MESSAGE_BODY = 10810;
- int INVALID_JMS_MESSAGE_TYPE = 10811;
- int NONMAP_MESSAGE_BODY = 10812;
- int NON_QUEUE_DESTINATION = 10813;
- int NON_QUEUE_FACTORY = 10814;
- int NON_TOPIC_DESTINATION = 10815;
- int NON_TOPIC_FACTORY = 10816;
- int INVALID_DELIVERY_MODE_VALUE = 10817;
- int ASYNC_MESSAGE_DELIVERY_NOT_SUPPORTED = 10818;
- int DURABLE_SUBSCRIBER_NOT_SUPPORTED = 10819;
- int CLIENT_UNSUBSCRIBE_DUE_TO_MESSAGE_DELIVERY_ERROR = 10820;
- int CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_REMOVAL = 10821;
- int CLIENT_UNSUBSCRIBE_DUE_TO_CONSUMER_STOP = 10822;
- int MISSING_DURABLE_SUBSCRIPTION_NAME = 10823;
- int JMSINVOCATION_EXCEPTION = 10824;
-}
-
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java b/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java
deleted file mode 100644
index fa375f0..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSConsumer.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.naming.NamingException;
-
-import flex.messaging.MessageException;
-import flex.messaging.log.Log;
-
-/**
- * A JMSProxy subclass for <code>javax.jms.MessageConsumer</code> instance.
- */
-public abstract class JMSConsumer extends JMSProxy implements ExceptionListener
-{
- /* JMS related variables */
- protected MessageConsumer consumer;
-
- protected MessageReceiver messageReceiver;
- protected String selectorExpression;
-
- // Keep track whether MessageReceiver was set manually by the user or JMSAdapter.
- // or automatically instantiated so appropriate error messages can be propagated
- // in the former and supressed in the latter case.
- private boolean messageReceiverManuallySet = false;
-
- /**
- * The lock to use to guard all state changes for the JMSConsumer.
- */
- protected Object lock = new Object();
-
- /**
- * The set of JMS message listeners to notify when a JMS message arrives.
- */
- private final CopyOnWriteArrayList jmsMessageListeners = new CopyOnWriteArrayList();
-
- /**
- * The set of JMS exception listeners to notify when a JMS exception is thrown.
- */
- private final CopyOnWriteArrayList jmsExceptionListeners = new CopyOnWriteArrayList();
-
- //--------------------------------------------------------------------------
- //
- // Initialize, validate, start, and stop methods.
- //
- //--------------------------------------------------------------------------
-
- /**
- * Starts the <code>JMSConsumer</code>. Subclasses should call <code>super.start</code>.
- *
- * @throws NamingException The thrown naming exception.
- * @throws JMSException The thrown JMS exception.
- */
- public void start() throws NamingException, JMSException
- {
- super.start();
-
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName + "' is starting.");
- }
-
- /**
- * Stops the <code>JMSConsumer</code> by stopping its associated receiver
- * adapter and closing the underlying <code>MessageConsumer</code>. It then
- * calls <code>JMSProxy.close</code> for session and connection closure.
- */
- public void stop()
- {
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName + "' is stopping.");
-
- stopMessageReceiver();
-
- try
- {
- if (consumer != null)
- consumer.close();
- }
- catch (JMSException e)
- {
- if (Log.isWarn())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS consumer for JMS destination '"
- + destinationJndiName + "' received an error while closing its underlying MessageConsumer: "
- + e.getMessage());
- }
-
- super.stop();
- }
-
- /**
- * Stops the <code>JMSConsumer</code> and unsubscribes a durable subscription
- * if one exists. By default this method delegates to <code>stop()</code>
- * and doesn't remove a durable subscription.
- *
- * @param unsubscribe Determines whether to unsubscribe a durable subscription
- * if one exists, or not.
- */
- public void stop(boolean unsubscribe)
- {
- stop();
- }
-
- //--------------------------------------------------------------------------
- //
- // Public Methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Adds a JMS message listener.
- *
- * @see flex.messaging.services.messaging.adapters.JMSMessageListener
- *
- * @param listener The listener to add.
- */
- public void addJMSMessageListener(JMSMessageListener listener)
- {
- if (listener != null)
- jmsMessageListeners.addIfAbsent(listener);
- }
-
- /**
- * Removes a JMS message listener.
- *
- * @see flex.messaging.services.messaging.adapters.JMSMessageListener
- *
- * @param listener The listener to remove.
- */
- public void removeJMSMessageListener(JMSMessageListener listener)
- {
- if (listener != null)
- jmsMessageListeners.remove(listener);
- }
-
- /**
- * Adds a JMS exception listener.
- *
- * @see flex.messaging.services.messaging.adapters.JMSExceptionListener
- *
- * @param listener The listener to add.
- */
- public void addJMSExceptionListener(JMSExceptionListener listener)
- {
- if (listener != null)
- jmsExceptionListeners.addIfAbsent(listener);
- }
-
- /**
- * Removes a JMS exception listener.
- *
- * @see flex.messaging.services.messaging.adapters.JMSExceptionListener
- *
- * @param listener The listener to remove.
- */
- public void removeJMSExceptionListener(JMSExceptionListener listener)
- {
- if (listener != null)
- jmsExceptionListeners.remove(listener);
- }
-
- /**
- * Sets the message listener of the underlying MessageConsumer. This method
- * is not meant to be directly called as it is used internally by
- * MessageReceivers that need to perform async message delivery. Any future
- * custom MessageReceiver implementations can use this method to set themselves
- * as MessageListeners to the underlying MessageConsumer.
- *
- * @param listener Message listener to set on the underlying MessageConsumer.
- * @return The old message listener associated with the MessageConsumer.
- * @throws JMSException The thrown JMS exception.
- */
- public MessageListener setMessageListener(MessageListener listener) throws JMSException
- {
- MessageListener oldListener = consumer.getMessageListener();
- consumer.setMessageListener(listener);
- return oldListener;
- }
-
- /**
- * Returns the <code>MessageReceiver</code> used by the consumer to retrieve
- * JMS messages.
- *
- * @return The <code>MessageReceiver</code> used.
- */
- public MessageReceiver getMessageReceiver()
- {
- return messageReceiver;
- }
-
- /**
- * Sets the <code>MessageReceiver</code> used by the consumer to retrieve
- * JMS messages. This property should not change after startup.
- *
- * @param messageReceiver The <code>MessageReceiver</code> used.
- */
- public void setMessageReceiver(MessageReceiver messageReceiver)
- {
- this.messageReceiver = messageReceiver;
- messageReceiverManuallySet = true;
- }
-
- /**
- * Returns the selector expression used when the underlying
- * <code>javax.jms.MessageConsumer</code> is created.
- *
- * @return The selector expression.
- */
- public String getSelectorExpression()
- {
- return selectorExpression;
- }
-
- /**
- * Sets the selector expression used when the underlying
- * <code>javax.jms.MessageConsumer</code> is created. This property should
- * not change after startup.
- *
- * @param selectorExpression The selector expression.
- */
- public void setSelectorExpression(String selectorExpression)
- {
- this.selectorExpression = selectorExpression;
- }
-
- /**
- * Implementation of javax.jms.ExceptionListener.onException.
- * Dispatches the JMS exception to registered JMS exception listeners.
- *
- * @param exception The thrown JMS exception.
- */
- public void onException(JMSException exception)
- {
- if (!jmsExceptionListeners.isEmpty())
- {
- // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
- for (Iterator iter = jmsExceptionListeners.iterator(); iter.hasNext();)
- ((JMSExceptionListener)iter.next()).exceptionThrown(new JMSExceptionEvent(this, exception));
- }
- }
-
- /**
- * Acnowledges the receipt of the message to the JMS server and passes the
- * message to registered JMS message listeners.
- *
- * @param jmsMessage The new JMS message to acknowledge and dispatch.
- */
- public void onMessage(Message jmsMessage)
- {
- acknowledgeMessage(jmsMessage);
-
- if (!jmsMessageListeners.isEmpty())
- {
- // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
- for (Iterator iter = jmsMessageListeners.iterator(); iter.hasNext();)
- ((JMSMessageListener)iter.next()).messageReceived(new JMSMessageEvent(this, jmsMessage));
- }
- }
-
- /**
- * Receive the next message from the underlying MessageConsumer or wait
- * indefinetely until a message arrives if there is no message.
- *
- * @return The received JMS message.
- * @throws JMSException The thrown JMS exception.
- */
- public Message receive() throws JMSException
- {
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info(Thread.currentThread()
- + " JMS consumer for JMS destination '" + destinationJndiName
- + "' is waiting forever until a new message arrives.");
-
- return consumer.receive();
- }
-
- /**
- * Receive the next message from the underlying MessageConsumer within the
- * specified timeout interval.
- *
- * @param timeout The number of milliseconds to wait for a new message.
- * @throws JMSException The thrown JMS exception.
- */
- public Message receive(long timeout) throws JMSException
- {
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info(Thread.currentThread()
- + " JMS consumer for JMS destination '" + destinationJndiName
- + "' is waiting " + timeout + " ms for new message to arrive");
-
- return consumer.receive(timeout);
- }
-
- /**
- * Receive the new message from the underlying MessageConsumer with no wait.
- *
- * @return The received JMS message.
- * @throws JMSException The thrown JMS exception.
- */
- public Message receiveNoWait() throws JMSException
- {
- return consumer.receiveNoWait();
- }
-
- //--------------------------------------------------------------------------
- //
- // Protected and Private Methods
- //
- //--------------------------------------------------------------------------
-
- /**
- * Start the Message Receiver of the <code>JMSConsumer</code>.
- *
- * @throws JMSException The thrown JMS exception.
- */
- void startMessageReceiver() throws JMSException
- {
- initializeMessageReceiver();
- messageReceiver.startReceive();
- connection.start();
- }
-
- /**
- * Stops the Message Receiver of the <code>JMSConsumer</code>.
- */
- void stopMessageReceiver()
- {
- if (messageReceiver != null)
- messageReceiver.stopReceive();
- }
-
- /**
- * Used internally to acknowledge the arrival of a message to the JMS server.
- *
- * @param message The JMS message to acknowledge.
- */
- protected void acknowledgeMessage(Message message)
- {
- if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- try
- {
- message.acknowledge();
- }
- catch (JMSException e)
- {
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName + "' received an error in message acknowledgement: " + e.getMessage());
- }
- }
- }
-
- /**
- * Initializes the message receiver used by the <code>JMSConsumer</code>.
- * If the message receiver has been manually set, it validates the message
- * receiver. Otherwise, it initalizes an async message receiver if it can,
- * and falls back to sync message delivery if it cannot.
- *
- * This method should be called by subclasses once there is an underlying
- * <code>javax.jms.MessageConsumer</code>.
- */
- private void initializeMessageReceiver()
- {
- // If an AsyncMessageReceiver is manually set, make sure the app server
- // allows MessageListener and ExceptionListener for JMS.
- if (messageReceiverManuallySet && messageReceiver != null)
- {
- if (messageReceiver instanceof AsyncMessageReceiver)
- {
- String restrictedMethod = null;
- try
- {
- // Test if MessageListener is restricted.
- restrictedMethod = "javax.jms.MessageConsumer.setMessageListener";
- consumer.getMessageListener();
-
- // Test if ExceptionListener is restricted.
- restrictedMethod = "javax.jms.Connection.setExceptionListener";
- connection.setExceptionListener((AsyncMessageReceiver)messageReceiver);
-
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName +"' is using async message receiver.");
- }
- catch (JMSException jmsEx)
- {
- // JMS consumer for JMS destination ''{0}'' is configured to use async message receiver but the application server does not allow ''{1}'' call used in async message receiver. Please switch to sync message receiver.
- MessageException me = new MessageException();
- me.setMessage(JMSConfigConstants.ASYNC_MESSAGE_DELIVERY_NOT_SUPPORTED, new Object[] {destinationJndiName, restrictedMethod});
- throw me;
- }
- }
- else if (messageReceiver instanceof SyncMessageReceiver)
- {
- SyncMessageReceiver smr = (SyncMessageReceiver)messageReceiver;
- if (Log.isInfo())
- {
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName +"' is using sync message receiver"
- + " with sync-receive-interval-millis: " + smr.getSyncReceiveIntervalMillis()
- + ", sync-receive-wait-millis: " + smr.getSyncReceiveWaitMillis());
- }
- }
- }
- // If no MessageReceiver was manually set, set a default MessageReceiver
- // with the following strategy: First try async message delivery. If the
- // app server doesn't allow it, switch to sync message delivery.
- else
- {
- try
- {
- messageReceiver = new AsyncMessageReceiver(this);
-
- // Test if MessageListener is restricted.
- consumer.getMessageListener();
- // Test if ExceptionListener is restricted.
- connection.setExceptionListener((AsyncMessageReceiver)messageReceiver);
-
- if (Log.isInfo())
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName +"' is using async message receiver.");
- }
- catch (JMSException e)
- {
- SyncMessageReceiver smr = new SyncMessageReceiver(this);
- smr.setSyncReceiveIntervalMillis(1);
- smr.setSyncReceiveWaitMillis(-1);
- messageReceiver = smr;
-
- if (Log.isInfo())
- {
- Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS consumer for JMS destination '"
- + destinationJndiName +"' is using sync message receiver"
- + " with sync-receive-interval-millis: " + smr.getSyncReceiveIntervalMillis()
- + ", sync-receive-wait-millis: " + smr.getSyncReceiveWaitMillis());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java b/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java
deleted file mode 100644
index f809efb..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.EventObject;
-import javax.jms.JMSException;
-
-/**
- * Event dispatched to the JMSExceptionListener when a JMS exception is encountered
- * by the source.
- *
- * @see flex.messaging.services.messaging.adapters.JMSExceptionListener
- */
-public class JMSExceptionEvent extends EventObject
-{
- private JMSException jmsException;
-
- /**
- * Create a new JMSExceptionEvent with the source and exception.
- *
- * @param source The source of the exception.
- * @param jmsException The actual JMS exception.
- */
- JMSExceptionEvent(JMSConsumer source, JMSException jmsException)
- {
- super(source);
- this.jmsException = jmsException;
- }
-
- /**
- * Return the JMS exception of the event.
- *
- * @return The JMS exception of the event.
- */
- public JMSException getJMSException()
- {
- return jmsException;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java b/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java
deleted file mode 100644
index 438e337..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSExceptionListener.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.EventListener;
-
-/**
- * An interface to be notified when a JMS exception is encountered by the JMS
- * consumer. Implementations of this interface may add themselves as listeners
- * via <code>JMSConsumer.addJMSExceptionListener</code>.
- */
-public interface JMSExceptionListener extends EventListener
-{
- /**
- * Notification that a JMS exception was encountered.
- *
- * @param evt JMSExceptionEvent to dispatch.
- */
- void exceptionThrown(JMSExceptionEvent evt);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java b/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java
deleted file mode 100644
index 2d583e7..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSMessageEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.EventObject;
-import javax.jms.Message;
-
-/**
- * Event dispatched to the JMSMessageListener when a JMS message is received
- * by the source.
- *
- * @see flex.messaging.services.messaging.adapters.JMSMessageListener
- *
- */
-public class JMSMessageEvent extends EventObject
-{
- private Message message;
-
- /**
- * Create a new JMSMessageEvent with the source and message.
- *
- * @param source The source of the message.
- * @param jmsException The actual JMS message.
- */
- JMSMessageEvent(JMSConsumer source, javax.jms.Message message)
- {
- super(source);
- this.message = message;
- }
-
- /**
- * Return the JMS message of the event.
- *
- * @return The JMS message of the event.
- */
- public Message getJMSMessage()
- {
- return message;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java b/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java
deleted file mode 100644
index 671416d..0000000
--- a/core/src/flex/messaging/services/messaging/adapters/JMSMessageListener.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.services.messaging.adapters;
-
-import java.util.EventListener;
-
-/**
- * An interface to be notified when a JMS message is received by the JMS
- * consumer. Implementations of this interface may add themselves as listeners
- * via <code>JMSConsumer.addJMSMessageListener</code>.
- */
-public interface JMSMessageListener extends EventListener
-{
- /**
- * Notification that a JMS message was received.
- *
- * @param evt JMSMessageEvent to dispatch.
- */
- void messageReceived(JMSMessageEvent evt);
-}
\ No newline at end of file