You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@jakarta.apache.org by se...@apache.org on 2010/06/24 02:13:21 UTC

svn commit: r957393 - in /jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms: client/OnMessageSubscriber.java client/ReceiveSubscriber.java sampler/SubscriberSampler.java

Author: sebb
Date: Thu Jun 24 00:13:20 2010
New Revision: 957393

URL: http://svn.apache.org/viewvc?rev=957393&view=rev
Log:
Rework subscriber code:
- drop client thread from ReceiveSubscriber and use receive(timeout) instead
- use LinkedBlockingQueue for Listener queue so can use poll-with-timeout
- move startup code to threadStarted() method
- tidy sample loop so it does not need to poll so frequently

Modified:
    jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java
    jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
    jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java

Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java
URL: http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java?rev=957393&r1=957392&r2=957393&view=diff
==============================================================================
--- jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java (original)
+++ jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java Thu Jun 24 00:13:20 2010
@@ -35,8 +35,8 @@ import org.apache.log.Logger;
 
 /**
  * OnMessageSubscriber is designed to create the connection, session and
- * subscriber. The sampler is responsible for implementing
- * javax.jms.MessageListener interface and onMessage(Message msg) method.
+ * subscriber (MessageConsumer). The sampler is responsible for implementing
+ * javax.jms.MessageListener interface and the onMessage(Message msg) method.
  *
  * The implementation provides a close() method to clean up the client at the
  * end of a test. This is important to make sure there aren't any zombie threads
@@ -54,60 +54,70 @@ public class OnMessageSubscriber impleme
 
     /**
      * Constructor takes the necessary JNDI related parameters to create a
-     * connection and begin receiving messages.
-     *
-     * @param useProps
-     * @param jndi
-     * @param url
+     * connection and prepare to begin receiving messages.
+     * <br/>
+     * The caller must then invoke {@link #start()} to enable message reception.
+     * 
+     * @param useProps if true, use jndi.properties instead of 
+     * initialContextFactory, providerUrl, securityPrincipal, securityCredentials
+     * @param initialContextFactory
+     * @param providerUrl
      * @param connfactory
      * @param destinationName
      * @param useAuth
-     * @param user
-     * @param pwd
+     * @param securityPrincipal
+     * @param securityCredentials
      * @throws JMSException if could not create context or other problem occurred.
      * @throws NamingException 
      */
-    public OnMessageSubscriber(boolean useProps, String jndi, String url, String connfactory, String destinationName,
-            boolean useAuth, String user, String pwd) throws JMSException, NamingException {
-        Context ctx = InitialContextFactory.getContext(useProps, jndi, url, useAuth, user, pwd);
+    public OnMessageSubscriber(boolean useProps, 
+            String initialContextFactory, String providerUrl, String connfactory, String destinationName,
+            boolean useAuth, 
+            String securityPrincipal, String securityCredentials) throws JMSException, NamingException {
+        Context ctx = InitialContextFactory.getContext(useProps, 
+                initialContextFactory, providerUrl, useAuth, securityPrincipal, securityCredentials);
         CONN = Utils.getConnection(ctx, connfactory);
         SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination dest = Utils.lookupDestination(ctx, destinationName);
         SUBSCRIBER = SESSION.createConsumer(dest);
+        log.debug("<init> complete");
+    }
+
+    /**
+     * Calls Connection.start() to begin receiving inbound messages.
+     * @throws JMSException 
+     */
+    public void start() throws JMSException {
+        log.debug("start()");
+        CONN.start();
     }
 
     /**
-     * resume will call Connection.start() to begin receiving inbound messages.
+     * Calls Connection.stop() to stop receiving inbound messages.
+     * @throws JMSException 
      */
-    public void resume() {
-        try {
-            this.CONN.start();
-        } catch (JMSException e) {
-            log.error("failed to start recieving");
-        }
+    public void stop() throws JMSException {
+        log.debug("stop()");
+        CONN.stop();
     }
 
     /**
      * close will close all the objects
      */
     public void close() {
-        log.info("Subscriber closed");
+        log.debug("close()");
         Utils.close(SUBSCRIBER, log);
         Utils.close(SESSION, log);
         Utils.close(CONN, log);
     }
 
     /**
-     * The sample uses this method to set itself as the listener. That means the
-     * sampler need to implement MessageListener interface.
+     * Set the MessageListener.
      *
      * @param listener
+     * @throws JMSException 
      */
-    public void setMessageListener(MessageListener listener) {
-        try {
-            this.SUBSCRIBER.setMessageListener(listener);
-        } catch (JMSException e) {
-            log.error(e.getMessage());
-        }
+    public void setMessageListener(MessageListener listener) throws JMSException {
+       SUBSCRIBER.setMessageListener(listener);
     }
 }

Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL: http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=957393&r1=957392&r2=957393&view=diff
==============================================================================
--- jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java (original)
+++ jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java Thu Jun 24 00:13:20 2010
@@ -19,16 +19,13 @@
 package org.apache.jmeter.protocol.jms.client;
 
 import java.io.Closeable;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -37,19 +34,10 @@ import org.apache.jorphan.logging.Loggin
 import org.apache.log.Logger;
 
 /**
- * Receives messages in a separate thread until told to stop.
- * Run loop permanently receives messages; the sampler calls reset()
- * when it has taken enough messages.
- *
+ * Uses MessageConsumer.receive(timeout) to fetch messages.
+ * Does not cache any messages.
  */
-/*
- * TODO Needs rework - there is a window between receiving a message and calling reset()
- * which means that a message can be lost. It's not clear why a separate thread is needed,
- * given that the sampler loops until enough samples have been received.
- * Also, messages are received in wait mode, so the RUN flag won't be checked until
- * at least one more message has been received.
-*/
-public class ReceiveSubscriber implements Runnable, Closeable {
+public class ReceiveSubscriber implements Closeable {
 
     private static final Logger log = LoggingManager.getLoggerForClass();
 
@@ -59,115 +47,87 @@ public class ReceiveSubscriber implement
 
     private final MessageConsumer SUBSCRIBER;
 
-    //@GuardedBy("this")
-    private int counter;
 
-    // Only MapMessage and TextMessage are currently supported
-    private final ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<Message>();
-
-    private volatile boolean RUN = true;
-    // Needs to be volatile to ensure value is picked up
-
-    //@GuardedBy("this")
-    private Thread CLIENTTHREAD;
-
-    public ReceiveSubscriber(boolean useProps, String jndi, String url, String connfactory, String destinationName,
-            boolean useAuth, String user, String pwd) throws NamingException, JMSException {
-        Context ctx = InitialContextFactory.getContext(useProps, jndi, url, useAuth, user, pwd);
+    /**
+     * Constructor takes the necessary JNDI related parameters to create a
+     * connection and prepare to begin receiving messages.
+     * <br/>
+     * The caller must then invoke {@link #start()} to enable message reception.
+     * 
+     * @param useProps if true, use jndi.properties instead of 
+     * initialContextFactory, providerUrl, securityPrincipal, securityCredentials
+     * @param initialContextFactory
+     * @param providerUrl
+     * @param connfactory
+     * @param destinationName
+     * @param useAuth
+     * @param securityPrincipal
+     * @param securityCredentials
+     * @throws JMSException if could not create context or other problem occurred.
+     * @throws NamingException 
+     */
+    public ReceiveSubscriber(boolean useProps, 
+            String initialContextFactory, String providerUrl, String connfactory, String destinationName,
+            boolean useAuth, 
+            String securityPrincipal, String securityCredentials) throws NamingException, JMSException {
+        Context ctx = InitialContextFactory.getContext(useProps, 
+                initialContextFactory, providerUrl, useAuth, securityPrincipal, securityCredentials);
         CONN = Utils.getConnection(ctx, connfactory);
         SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination dest = Utils.lookupDestination(ctx, destinationName);
         SUBSCRIBER = SESSION.createConsumer(dest);
+        log.debug("<init> complete");
     }
 
     /**
-     * Resume will call Connection.start() and begin receiving messages from the
-     * JMS provider.
+     * Calls Connection.start() to begin receiving inbound messages.
+     * @throws JMSException 
      */
-    public void resume() {
-        if (this.CONN == null) {
-            log.error("Connection not set up");
-            return;
-        }
-        try {
-            this.CONN.start();
-        } catch (JMSException e) {
-            log.error("failed to start recieving");
-        }
+    public void start() throws JMSException {
+        log.debug("start()");
+        CONN.start();
     }
 
     /**
-     * Get the message
-     * @return the next message from the queue or null if none
+     * Calls Connection.stop() to stop receiving inbound messages.
+     * @throws JMSException 
      */
-    public synchronized Message getMessage() {
-        Message msg = queue.poll();
-        if (msg != null) {
-            counter--;
-        }
-        return msg;
+    public void stop() throws JMSException {
+        log.debug("stop()");
+        CONN.stop();
     }
 
     /**
+     * Get the next message or null.
+     * Never blocks for longer than the specified timeout.
+     * 
+     * @param timeout in milliseconds
+     * @return the next message or null
+     * 
+     * @throws JMSException
+     */
+    public Message getMessage(long timeout) throws JMSException {
+        Message message = null;
+        if (timeout < 10) { // Allow for short/negative times
+            message = SUBSCRIBER.receiveNoWait();                
+        } else {
+            message = SUBSCRIBER.receive(timeout);
+        }
+        return message;
+    }
+    /**
      * close() will stop the connection first. Then it closes the subscriber,
      * session and connection.
      */
     public synchronized void close() { // called from testEnded() thread
-        this.RUN = false;
+        log.debug("close()");
         try {
-            this.CONN.stop();
-            Utils.close(SUBSCRIBER, log);
-            Utils.close(SESSION, log);
-            Utils.close(CONN, log);
-            this.CLIENTTHREAD.interrupt();
-            this.CLIENTTHREAD = null;
-            queue.clear();
+            CONN.stop();
         } catch (JMSException e) {
             log.error(e.getMessage());
-        } catch (Exception e) {
-            log.error(e.getMessage());
-        }
-    }
-
-    /**
-     * Increment the count and return the new value
-     *
-     * @param increment
-     */
-    public synchronized int count(int increment) {
-        counter += increment;
-        return counter;
-    }
-
-    /**
-     * start will create a new thread and pass this class. once the thread is
-     * created, it calls Thread.start().
-     */
-    public void start() {
-        this.CLIENTTHREAD = new Thread(this, "Subscriber2");
-        this.CLIENTTHREAD.start();
-    }
-
-    /**
-     * run calls listen to begin listening for inbound messages from the
-     * provider.
-     * 
-     * Updates the count field so the caller can check how many messages have been receieved.
-     * 
-     */
-    public void run() {
-        while (RUN) {
-            try {
-                Message message = this.SUBSCRIBER.receive();
-                if (message instanceof TextMessage || message instanceof MapMessage) {
-                    queue.add(message);
-                    count(1);
-                } else if (message != null){
-                	log.warn("Discarded non Map|TextMessage " +  message);
-                }
-            } catch (JMSException e) {
-                log.error("Communication error: " + e.getMessage());
-            }
         }
+        Utils.close(SUBSCRIBER, log);
+        Utils.close(SESSION, log);
+        Utils.close(CONN, log);
     }
 }

Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
URL: http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java?rev=957393&r1=957392&r2=957393&view=diff
==============================================================================
--- jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java (original)
+++ jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java Thu Jun 24 00:13:20 2010
@@ -18,7 +18,8 @@
 package org.apache.jmeter.protocol.jms.sampler;
 
 import java.util.Enumeration;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
@@ -30,6 +31,7 @@ import javax.naming.NamingException;
 import org.apache.jmeter.samplers.Interruptible;
 import org.apache.jmeter.samplers.SampleResult;
 import org.apache.jmeter.testelement.TestListener;
+import org.apache.jmeter.testelement.ThreadListener;
 import org.apache.jmeter.util.JMeterUtils;
 import org.apache.jmeter.engine.event.LoopIterationEvent;
 
@@ -45,22 +47,34 @@ import org.apache.log.Logger;
 /**
  * This class implements the JMS Subcriber sampler
  */
-public class SubscriberSampler extends BaseJMSSampler implements Interruptible, TestListener, MessageListener {
+public class SubscriberSampler extends BaseJMSSampler implements Interruptible, TestListener, ThreadListener, MessageListener {
 
     private static final long serialVersionUID = 240L;
 
     private static final Logger log = LoggingManager.getLoggerForClass();
 
+    // Default wait (ms) for a message if timeouts are not enabled
+    // This is the maximimum time the sampler can be blocked.
+    private static final long DEFAULT_WAIT = 500L;
+
     // No need to synch/ - only used by sampler and ClientPool (which does its own synch)
     private transient ReceiveSubscriber SUBSCRIBER = null;
 
-    // Only MapMessage and TextMessage are currently supported
-    private final ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<Message>();
+    /*
+     * We use a LinkedBlockingQueue (rather than a ConcurrentLinkedQueue) because it has a
+     * poll-with-wait method that avoids the need to use a polling loop.
+     */
+    private transient LinkedBlockingQueue<Message> queue;
     
     private transient volatile boolean interrupted = false;
 
     private transient long timeout;
 
+    private boolean useReceive;
+
+    // This will be null iff initialisation succeeeds.
+    private transient Exception exceptionDuringInit;
+
     // Don't change the string, as it is used in JMX files
     private static final String CLIENT_CHOICE = "jms.client_choice"; // $NON-NLS-1$
     private static final String TIMEOUT = "jms.timeout"; // $NON-NLS-1$
@@ -105,23 +119,20 @@ public class SubscriberSampler extends B
      * @throws NamingException 
      *
      */
-    private OnMessageSubscriber initListenerClient() throws JMSException, NamingException {
-        timeout = getTimeoutAsLong();
-        interrupted = false;
+    private void initListenerClient() throws JMSException, NamingException {
         OnMessageSubscriber sub = (OnMessageSubscriber) ClientPool.get(this);
         if (sub == null) {
             sub = new OnMessageSubscriber(getUseJNDIPropertiesAsBoolean(), getJNDIInitialContextFactory(),
                     getProviderUrl(), getConnectionFactory(), getDestination(), 
                     isUseAuth(), getUsername(), getPassword());
-            queue.clear();
+            queue = new LinkedBlockingQueue<Message>();
             sub.setMessageListener(this);
-            sub.resume();
+            sub.start();
             ClientPool.addClient(sub);
             ClientPool.put(this, sub);
             log.debug("SubscriberSampler.initListenerClient called");
             log.debug("loop count " + getIterations());
         }
-        return sub;
     }
 
     /**
@@ -130,12 +141,10 @@ public class SubscriberSampler extends B
      * @throws JMSException 
      */
     private void initReceiveClient() throws NamingException, JMSException {
-        timeout = getTimeoutAsLong();
-        interrupted = false;
         SUBSCRIBER = new ReceiveSubscriber(getUseJNDIPropertiesAsBoolean(),
                 getJNDIInitialContextFactory(), getProviderUrl(), getConnectionFactory(), getDestination(),
                 isUseAuth(), getUsername(), getPassword());
-        SUBSCRIBER.resume();
+        SUBSCRIBER.start();
         ClientPool.addClient(SUBSCRIBER);
         log.debug("SubscriberSampler.initReceiveClient called");
     }
@@ -148,68 +157,58 @@ public class SubscriberSampler extends B
      */
     @Override
     public SampleResult sample() {
-        if (getClientChoice().equals(JMSSubscriberGui.RECEIVE_RSC)) {
-            return sampleWithReceive();
+        SampleResult result = new SampleResult();
+        result.setDataType(SampleResult.TEXT);
+        result.setSampleLabel(getName());
+        result.sampleStart();
+        if (exceptionDuringInit != null) {
+            result.sampleEnd();
+            result.setSuccessful(false);
+            result.setResponseCode("000");
+            result.setResponseMessage(exceptionDuringInit.toString());
+            return result; 
+        }
+        if (useReceive) {
+            return sampleWithReceive(result);
         } else {
-            return sampleWithListener();
+            return sampleWithListener(result);
         }
     }
 
     /**
      * sample will block until messages are received
+     * @param result 
      *
      * @return the sample result
      */
-    private SampleResult sampleWithListener() {
-        SampleResult result = new SampleResult();
-        result.setDataType(SampleResult.TEXT);
+    private SampleResult sampleWithListener(SampleResult result) {
         StringBuilder buffer = new StringBuilder();
         StringBuilder propBuffer = new StringBuilder();
-        int cnt;
-        int loop = getIterationCount();
 
-        
-        result.setSampleLabel(getName());
-        try {
-            initListenerClient();
-        } catch (JMSException ex) {
-            result.sampleStart();
-            result.sampleEnd();
-            result.setResponseCode("000");
-            result.setResponseMessage(ex.toString());
-            return result;
-        } catch (NamingException ex) {
-            result.sampleStart();
-            result.sampleEnd();
-            result.setResponseCode("000");
-            result.setResponseMessage(ex.toString());
-            return result;
-        }
+        int loop = getIterationCount();
+        int read = 0;
 
         long until = 0L;
+        long now = System.currentTimeMillis();
         if (timeout > 0) {
-            until = timeout + System.currentTimeMillis(); 
+            until = timeout + now; 
         }
-        result.sampleStart();
         while (!interrupted
-                && (until == 0 || System.currentTimeMillis() < until)
-                && queue.size() < loop) {// check this last as it is the most expensive
+                && (until == 0 || now < until)
+                && read < loop) {
             try {
-                Thread.sleep(0, 50);
+                Message msg = queue.poll(calculateWait(until, now), TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    read++;
+                    extractContent(buffer, propBuffer, msg);
+                }
             } catch (InterruptedException e) {
-                log.debug(e.getMessage());
+                // Ignored
             }
+            now = System.currentTimeMillis();
         }
         result.sampleEnd();
        
-        int read = 0;
-        for(cnt = 0; cnt < loop ; cnt++) {
-            Message msg = queue.poll();
-            if (msg != null) {
-                read++;
-                extractContent(buffer, propBuffer, msg);
-            }
-        }
         if (getReadResponseAsBoolean()) {
             result.setResponseData(buffer.toString().getBytes());
         } else {
@@ -217,11 +216,12 @@ public class SubscriberSampler extends B
         }
         result.setResponseHeaders(propBuffer.toString());
         result.setDataType(SampleResult.TEXT);
-        result.setSuccessful(true);
         if (read == 0) {
             result.setResponseCode("404"); // Not found
-        } else {
+            result.setSuccessful(false);
+        } else { // TODO set different status if not enough messages found?
             result.setResponseCodeOK();
+            result.setSuccessful(true);
         }
         result.setResponseMessage(read + " messages received");
         result.setSamplerData(loop + " messages expected");
@@ -233,70 +233,51 @@ public class SubscriberSampler extends B
     /**
      * Sample method uses the ReceiveSubscriber client instead of onMessage
      * approach.
+     * @param result 
      *
      * @return the sample result
      */
-    private SampleResult sampleWithReceive() {
-        SampleResult result = new SampleResult();
-        result.setDataType(SampleResult.TEXT);
+    private SampleResult sampleWithReceive(SampleResult result) {
         StringBuilder buffer = new StringBuilder();
         StringBuilder propBuffer = new StringBuilder();
-        int cnt;
-        
         
-        result.setSampleLabel(getName());
-        if (SUBSCRIBER == null) { // TODO perhaps do this in test[Iteration]Start?
-            try {
-                initReceiveClient();
-            } catch (NamingException ex) {
-                result.sampleStart();
-                result.sampleEnd();
-                result.setResponseCode("000");
-                result.setResponseMessage(ex.toString());
-                return result;
-            } catch (JMSException ex) {
-                result.sampleStart();
-                result.sampleEnd();
-                result.setResponseCode("000");
-                result.setResponseMessage(ex.toString());
-                return result;
-            }
-            SUBSCRIBER.start();
-        }
         int loop = getIterationCount();
+        int read = 0;
 
         long until = 0L;
+        long now = System.currentTimeMillis();
         if (timeout > 0) {
-            until = timeout + System.currentTimeMillis(); 
+            until = timeout + now; 
         }
-        result.sampleStart();
         while (!interrupted
-                && (until == 0 || System.currentTimeMillis() < until)
-                && SUBSCRIBER.count(0) < loop) { // check this last as it is most expensive
+                && (until == 0 || now < until)
+                && read < loop) {
+            Message msg;
             try {
-                Thread.sleep(0, 50);
-            } catch (InterruptedException e) {
-                log.debug(e.getMessage());
+                msg = SUBSCRIBER.getMessage(calculateWait(until, now));
+                if (msg != null){
+                    read++;
+                    extractContent(buffer, propBuffer, msg);
+                }
+            } catch (JMSException e) {
+                log.warn("Error "+e.toString());
             }
+            now = System.currentTimeMillis();
         }
         result.sampleEnd();
-        int read = SUBSCRIBER.count(0);
         result.setResponseMessage(read + " samples messages received");
-        for(cnt = 0; cnt < read ; cnt++) {
-            Message msg = SUBSCRIBER.getMessage();
-            extractContent(buffer, propBuffer, msg);
-        }
         if (getReadResponseAsBoolean()) {
             result.setResponseData(buffer.toString().getBytes());
         } else {
             result.setBytes(buffer.toString().getBytes().length);
         }
         result.setResponseHeaders(propBuffer.toString());
-        result.setSuccessful(true);
         if (read == 0) {
             result.setResponseCode("404"); // Not found
-        } else {
+            result.setSuccessful(false);
+        } else { // TODO set different status if not enough messages found?
             result.setResponseCodeOK();
+            result.setSuccessful(true);
         }
         result.setResponseMessage(read + " message(s) received successfully");
         result.setSamplerData(loop + " messages expected");
@@ -305,6 +286,19 @@ public class SubscriberSampler extends B
         return result;
     }
 
+    /**
+     * Calculate the wait time, will never be more than DEFAULT_WAIT.
+     * 
+     * @param until target end time or 0 if timeouts not active
+     * @param now current time
+     * @return wait time
+     */
+    private long calculateWait(long until, long now) {
+        if (until == 0) return DEFAULT_WAIT; // Timeouts not active
+        long wait = until - now; // How much left
+        return wait > DEFAULT_WAIT ? DEFAULT_WAIT : wait;
+    }
+
     private void extractContent(StringBuilder buffer, StringBuilder propBuffer,
             Message msg) {
         if (msg != null) {
@@ -337,9 +331,9 @@ public class SubscriberSampler extends B
      * The sampler implements MessageListener directly and sets itself as the
      * listener with the MessageConsumer.
      */
-    public synchronized void onMessage(Message message) {
-        if (message instanceof TextMessage || message instanceof MapMessage) {
-            queue.add(message);
+    public void onMessage(Message message) {
+        if (!queue.offer(message)){
+            log.warn("Could not add message to queue");
         }
     }
 
@@ -391,4 +385,34 @@ public class SubscriberSampler extends B
 
     // This was the old value that was checked for
     private final static String RECEIVE_STR = JMeterUtils.getResString(JMSSubscriberGui.RECEIVE_RSC); // $NON-NLS-1$
+
+    public void threadFinished() {
+    }
+
+    public void threadStarted() {
+        timeout = getTimeoutAsLong();
+        interrupted = false;
+        exceptionDuringInit = null;
+        useReceive = getClientChoice().equals(JMSSubscriberGui.RECEIVE_RSC);
+        if (useReceive) {
+            try {
+                initReceiveClient();
+            } catch (NamingException e) {
+                exceptionDuringInit = e;
+            } catch (JMSException e) {
+                exceptionDuringInit = e;
+            }
+        } else {
+            try {
+                initListenerClient();
+            } catch (JMSException e) {
+                exceptionDuringInit = e;
+            } catch (NamingException e) {
+                exceptionDuringInit = e;
+            }
+        }
+        if (exceptionDuringInit != null){
+            log.error("Could not initialise client",exceptionDuringInit);
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@jakarta.apache.org
For additional commands, e-mail: notifications-help@jakarta.apache.org