You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jmeter-dev@jakarta.apache.org by se...@apache.org on 2008/11/11 22:37:29 UTC

svn commit: r713179 - /jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java

Author: sebb
Date: Tue Nov 11 13:37:26 2008
New Revision: 713179

URL: http://svn.apache.org/viewvc?rev=713179&view=rev
Log:
Lots of thread-safety fixes, but still needs some work
Merged close() and resetCount() methods as they need to be done together

Modified:
    jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java

Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL: http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=713179&r1=713178&r2=713179&view=diff
==============================================================================
--- jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java (original)
+++ jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java Tue Nov 11 13:37:26 2008
@@ -32,46 +32,71 @@
 import org.apache.jorphan.logging.LoggingManager;
 import org.apache.log.Logger;
 
+/**
+ * Receives messages in a separate thread until told to stop.
+ * Run loop permanently receives messages; the sampler calls reset()
+ * when it has taken enough messages.
+ *
+ */
+/*
+ * TODO Needs rework - there is a window between receiving a message and calling reset()
+ * which means that a message can be lost. It's not clear why a separate thread is needed,
+ * given that the sampler loops until enough samples have been received.
+ * Also, messages are received in wait mode, so the RUN flag won't be checked until
+ * at least one more message has been received.
+*/
 public class ReceiveSubscriber implements Runnable {
 
     private static final Logger log = LoggingManager.getLoggerForClass();
 
-    private TopicConnection CONN = null;
+    private final TopicConnection CONN;
 
-    private TopicSession SESSION = null;
+    private final TopicSession SESSION;
 
-    private Topic TOPIC = null;
+    private final Topic TOPIC;
 
-    private TopicSubscriber SUBSCRIBER = null;
-
-    private byte[] RESULT = null;
+    private final TopicSubscriber SUBSCRIBER;
 
+    //@GuardedBy("this")
     private int counter;
 
     private int loop = 1; // TODO never read
 
+    //@GuardedBy("this")
     private StringBuffer buffer = new StringBuffer();
 
+    //@GuardedBy("this")
     private volatile boolean RUN = true;
     // Needs to be volatile to ensure value is picked up
 
-    private Thread CLIENTTHREAD = null;
-
-    /**
-     *
-     */
-    public ReceiveSubscriber() {
-        super();
-    }
+    //@GuardedBy("this")
+    private Thread CLIENTTHREAD;
 
     public ReceiveSubscriber(boolean useProps, String jndi, String url, String connfactory, String topic,
             boolean useAuth, String user, String pwd) {
         Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd);
+        TopicConnection _conn = null;
+        Topic _topic = null;
+        TopicSession _session = null;
+        TopicSubscriber _subscriber = null;
         if (ctx != null) {
-            initConnection(ctx, connfactory, topic);
+            try {
+                ConnectionFactory.getTopicConnectionFactory(ctx,connfactory);
+                _conn = ConnectionFactory.getTopicConnection();
+                _topic = InitialContextFactory.lookupTopic(ctx, topic);
+                _session = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+                _subscriber = this.SESSION.createSubscriber(this.TOPIC);
+                log.info("created the topic connection successfully");
+            } catch (JMSException e) {
+                log.error("Connection error: " + e.getMessage());
+            }
         } else {
             log.error("Could not initialize JNDI Initial Context Factory");
         }
+        this.CONN = _conn;
+        this.TOPIC = _topic;
+        this.SESSION = _session;
+        this.SUBSCRIBER = _subscriber;
     }
 
     /**
@@ -85,7 +110,8 @@
      * @param pwd
      * @return  the JNDI initial context or null
      */
-    public Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) {
+    // Called by ctor
+    private Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) {
         if (useProps) {
             try {
                 return new InitialContext();
@@ -99,26 +125,6 @@
     }
 
     /**
-     * Create the connection, session and topic subscriber
-     *
-     * @param ctx
-     * @param connfactory
-     * @param topic
-     */
-    public void initConnection(Context ctx, String connfactory, String topic) {
-        try {
-            ConnectionFactory.getTopicConnectionFactory(ctx,connfactory);
-            this.CONN = ConnectionFactory.getTopicConnection();
-            this.TOPIC = InitialContextFactory.lookupTopic(ctx, topic);
-            this.SESSION = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
-            this.SUBSCRIBER = this.SESSION.createSubscriber(this.TOPIC);
-            log.info("created the topic connection successfully");
-        } catch (JMSException e) {
-            log.error("Connection error: " + e.getMessage());
-        }
-    }
-
-    /**
      * Set the number of iterations for each call to sample()
      *
      * @param loop
@@ -147,53 +153,45 @@
      * Get the message as a string
      *
      */
-    public String getMessage() {
+    public synchronized String getMessage() {
         return this.buffer.toString();
     }
 
     /**
      * Get the message(s) as an array of byte[]
-     *
+     * 
      */
-    public byte[] getByteResult() {
-        if (this.buffer.length() > 0) {
-            this.RESULT = this.buffer.toString().getBytes();
-        }
-        return this.RESULT;
+    public synchronized byte[] getByteResult() {
+        return this.buffer.toString().getBytes();
     }
 
     /**
      * close() will stop the connection first. Then it closes the subscriber,
-     * session and connection and sets them to null.
+     * session and connection.
      */
-    public synchronized void close() {
+    public synchronized void close() { // called from testEnded() thread
         try {
+            this.RUN = false;
             this.CONN.stop();
             this.SUBSCRIBER.close();
             this.SESSION.close();
             this.CONN.close();
-            this.SUBSCRIBER = null;
-            this.SESSION = null;
-            this.CONN = null;
-            this.RUN = false;
             this.CLIENTTHREAD.interrupt();
             this.CLIENTTHREAD = null;
             this.buffer.setLength(0);
-            this.buffer = null;
         } catch (JMSException e) {
             log.error(e.getMessage());
-        } catch (Throwable e) {
+        } catch (Exception e) {
             log.error(e.getMessage());
         }
     }
 
     /**
-     * Clear will set the buffer to zero and the result objects to null. Clear
-     * should be called at the end of a sample.
+     * Reset the receiver ready for receiving any further messages
      */
-    public void clear() {
+    public synchronized void reset() {
+        counter = 0;
         this.buffer.setLength(0);
-        this.RESULT = null;
     }
 
     /**
@@ -207,52 +205,46 @@
     }
 
     /**
-     * Reset will reset the counter and prepare for the next sample() call.
-     *
-     */
-    public synchronized int resetCount() {
-        counter = 0;
-        return counter;
-    }
-
-    /**
      * start will create a new thread and pass this class. once the thread is
      * created, it calls Thread.start().
      */
     public void start() {
+        // No point starting thread unless subscriber exists
+        if (SUBSCRIBER == null) {
+            log.error("Subscriber has not been set up");
+            return;
+        }
         this.CLIENTTHREAD = new Thread(this, "Subscriber2");
         this.CLIENTTHREAD.start();
     }
 
     /**
-     * run calls listen to begin listening for inboud messages from the
+     * run calls listen to begin listening for inbound messages from the
      * provider.
+     * 
+     * Updates the count field so the caller can check how many messages have been receieved.
+     * 
      */
     public void run() {
-        ReceiveSubscriber.this.listen();
-    }
-
-    /**
-     * Listen for inbound messages
-     */
-    protected void listen() {
-        log.info("Subscriber2.listen() called");
+        if (SUBSCRIBER == null) { // just in case
+            log.error("Subscriber has not been set up");
+            return;
+        }
         while (RUN) {
-            if (SUBSCRIBER == null) {
-                log.error("Subscriber has not been set up");
-                break;
-            }
             try {
                 Message message = this.SUBSCRIBER.receive();
                 if (message != null && message instanceof TextMessage) {
                     TextMessage msg = (TextMessage) message;
-                    if (msg.getText().trim().length() > 0) {
-                        this.buffer.append(msg.getText());
-                        count(1);
+                    String text = msg.getText();
+                    if (text.trim().length() > 0) {
+                        synchronized (this) {
+                            this.buffer.append(text);
+                            count(1);
+                        }
                     }
                 }
             } catch (JMSException e) {
-                log.info("Communication error: " + e.getMessage());
+                log.error("Communication error: " + e.getMessage());
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: jmeter-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jmeter-dev-help@jakarta.apache.org