You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jmeter-dev@jakarta.apache.org by se...@apache.org on 2008/11/11 22:37:29 UTC
svn commit: r713179 -
/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
Author: sebb
Date: Tue Nov 11 13:37:26 2008
New Revision: 713179
URL: http://svn.apache.org/viewvc?rev=713179&view=rev
Log:
Lots of thread-safety fixes, but still needs some work
Merged close() and resetCount() methods as they need to be done together
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL: http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=713179&r1=713178&r2=713179&view=diff
==============================================================================
--- jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java (original)
+++ jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java Tue Nov 11 13:37:26 2008
@@ -32,46 +32,71 @@
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
+/**
+ * Receives messages in a separate thread until told to stop.
+ * Run loop permanently receives messages; the sampler calls reset()
+ * when it has taken enough messages.
+ *
+ */
+/*
+ * TODO Needs rework - there is a window between receiving a message and calling reset()
+ * which means that a message can be lost. It's not clear why a separate thread is needed,
+ * given that the sampler loops until enough samples have been received.
+ * Also, messages are received in wait mode, so the RUN flag won't be checked until
+ * at least one more message has been received.
+*/
public class ReceiveSubscriber implements Runnable {
private static final Logger log = LoggingManager.getLoggerForClass();
- private TopicConnection CONN = null;
+ private final TopicConnection CONN;
- private TopicSession SESSION = null;
+ private final TopicSession SESSION;
- private Topic TOPIC = null;
+ private final Topic TOPIC;
- private TopicSubscriber SUBSCRIBER = null;
-
- private byte[] RESULT = null;
+ private final TopicSubscriber SUBSCRIBER;
+ //@GuardedBy("this")
private int counter;
private int loop = 1; // TODO never read
+ //@GuardedBy("this")
private StringBuffer buffer = new StringBuffer();
+ //@GuardedBy("this")
private volatile boolean RUN = true;
// Needs to be volatile to ensure value is picked up
- private Thread CLIENTTHREAD = null;
-
- /**
- *
- */
- public ReceiveSubscriber() {
- super();
- }
+ //@GuardedBy("this")
+ private Thread CLIENTTHREAD;
public ReceiveSubscriber(boolean useProps, String jndi, String url, String connfactory, String topic,
boolean useAuth, String user, String pwd) {
Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd);
+ TopicConnection _conn = null;
+ Topic _topic = null;
+ TopicSession _session = null;
+ TopicSubscriber _subscriber = null;
if (ctx != null) {
- initConnection(ctx, connfactory, topic);
+ try {
+ ConnectionFactory.getTopicConnectionFactory(ctx,connfactory);
+ _conn = ConnectionFactory.getTopicConnection();
+ _topic = InitialContextFactory.lookupTopic(ctx, topic);
+ _session = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+ _subscriber = this.SESSION.createSubscriber(this.TOPIC);
+ log.info("created the topic connection successfully");
+ } catch (JMSException e) {
+ log.error("Connection error: " + e.getMessage());
+ }
} else {
log.error("Could not initialize JNDI Initial Context Factory");
}
+ this.CONN = _conn;
+ this.TOPIC = _topic;
+ this.SESSION = _session;
+ this.SUBSCRIBER = _subscriber;
}
/**
@@ -85,7 +110,8 @@
* @param pwd
* @return the JNDI initial context or null
*/
- public Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) {
+ // Called by ctor
+ private Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) {
if (useProps) {
try {
return new InitialContext();
@@ -99,26 +125,6 @@
}
/**
- * Create the connection, session and topic subscriber
- *
- * @param ctx
- * @param connfactory
- * @param topic
- */
- public void initConnection(Context ctx, String connfactory, String topic) {
- try {
- ConnectionFactory.getTopicConnectionFactory(ctx,connfactory);
- this.CONN = ConnectionFactory.getTopicConnection();
- this.TOPIC = InitialContextFactory.lookupTopic(ctx, topic);
- this.SESSION = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
- this.SUBSCRIBER = this.SESSION.createSubscriber(this.TOPIC);
- log.info("created the topic connection successfully");
- } catch (JMSException e) {
- log.error("Connection error: " + e.getMessage());
- }
- }
-
- /**
* Set the number of iterations for each call to sample()
*
* @param loop
@@ -147,53 +153,45 @@
* Get the message as a string
*
*/
- public String getMessage() {
+ public synchronized String getMessage() {
return this.buffer.toString();
}
/**
* Get the message(s) as an array of byte[]
- *
+ *
*/
- public byte[] getByteResult() {
- if (this.buffer.length() > 0) {
- this.RESULT = this.buffer.toString().getBytes();
- }
- return this.RESULT;
+ public synchronized byte[] getByteResult() {
+ return this.buffer.toString().getBytes();
}
/**
* close() will stop the connection first. Then it closes the subscriber,
- * session and connection and sets them to null.
+ * session and connection.
*/
- public synchronized void close() {
+ public synchronized void close() { // called from testEnded() thread
try {
+ this.RUN = false;
this.CONN.stop();
this.SUBSCRIBER.close();
this.SESSION.close();
this.CONN.close();
- this.SUBSCRIBER = null;
- this.SESSION = null;
- this.CONN = null;
- this.RUN = false;
this.CLIENTTHREAD.interrupt();
this.CLIENTTHREAD = null;
this.buffer.setLength(0);
- this.buffer = null;
} catch (JMSException e) {
log.error(e.getMessage());
- } catch (Throwable e) {
+ } catch (Exception e) {
log.error(e.getMessage());
}
}
/**
- * Clear will set the buffer to zero and the result objects to null. Clear
- * should be called at the end of a sample.
+ * Reset the receiver ready for receiving any further messages
*/
- public void clear() {
+ public synchronized void reset() {
+ counter = 0;
this.buffer.setLength(0);
- this.RESULT = null;
}
/**
@@ -207,52 +205,46 @@
}
/**
- * Reset will reset the counter and prepare for the next sample() call.
- *
- */
- public synchronized int resetCount() {
- counter = 0;
- return counter;
- }
-
- /**
* start will create a new thread and pass this class. once the thread is
* created, it calls Thread.start().
*/
public void start() {
+ // No point starting thread unless subscriber exists
+ if (SUBSCRIBER == null) {
+ log.error("Subscriber has not been set up");
+ return;
+ }
this.CLIENTTHREAD = new Thread(this, "Subscriber2");
this.CLIENTTHREAD.start();
}
/**
- * run calls listen to begin listening for inboud messages from the
+ * run calls listen to begin listening for inbound messages from the
* provider.
+ *
+ * Updates the count field so the caller can check how many messages have been receieved.
+ *
*/
public void run() {
- ReceiveSubscriber.this.listen();
- }
-
- /**
- * Listen for inbound messages
- */
- protected void listen() {
- log.info("Subscriber2.listen() called");
+ if (SUBSCRIBER == null) { // just in case
+ log.error("Subscriber has not been set up");
+ return;
+ }
while (RUN) {
- if (SUBSCRIBER == null) {
- log.error("Subscriber has not been set up");
- break;
- }
try {
Message message = this.SUBSCRIBER.receive();
if (message != null && message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
- if (msg.getText().trim().length() > 0) {
- this.buffer.append(msg.getText());
- count(1);
+ String text = msg.getText();
+ if (text.trim().length() > 0) {
+ synchronized (this) {
+ this.buffer.append(text);
+ count(1);
+ }
}
}
} catch (JMSException e) {
- log.info("Communication error: " + e.getMessage());
+ log.error("Communication error: " + e.getMessage());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: jmeter-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jmeter-dev-help@jakarta.apache.org