You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ma...@apache.org on 2006/11/28 16:29:17 UTC
svn commit: r480089 - in
/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher:
FileMessageDispatcher.java FileMessageFactory.java MessageFactory.java
MonitorMessageDispatcher.java Publisher.java
Author: marnie
Date: Tue Nov 28 07:29:16 2006
New Revision: 480089
URL: http://svn.apache.org/viewvc?view=rev&rev=480089
Log:
Further example tidy up
Added:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java (with props)
Removed:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java?view=diff&rev=480089&r1=480088&r2=480089
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java Tue Nov 28 07:29:16 2006
@@ -19,7 +19,7 @@
package org.apache.qpid.example.publisher;
import org.apache.log4j.Logger;
-import java.util.Properties;
+
import java.io.File;
import org.apache.qpid.example.shared.FileUtils;
@@ -34,12 +34,17 @@
*/
public class FileMessageDispatcher {
- private static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class);
-
- private static Publisher _publisher = null;
+ protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class);
- private static final String DEFAULT_PUB_NAME = "Publisher";
+ protected static Publisher _publisher = null;
+ /**
+ * To use this main method you need to specify a path or file to use for input
+ * This class then uses file contents from the dir/file specified to generate
+ * messages to publish
+ * Intended to be a very simple way to get going with publishing using the broker
+ * @param args - must specify one value, the path to file(s) for publisher
+ */
public static void main(String[] args)
{
@@ -52,7 +57,7 @@
{
try
{
- //publish message(s) from file(s) and send message to monitor queue
+ //publish message(s) from file(s) to configured queue
publish(args[0]);
//Move payload file(s) to archive location as no error
@@ -60,7 +65,8 @@
}
catch(Exception e)
{
- System.err.println("Error trying to dispatch message: " + e);
+ //log error and exit
+ _logger.error("Error trying to dispatch message: " + e);
System.exit(1);
}
finally
@@ -81,8 +87,12 @@
System.exit(0);
}
-
- //Publish files or file as message
+ /**
+ * Publish the content of a file or files from a directory as messages
+ * @param path - from main args
+ * @throws JMSException
+ * @throws MessageFactoryException - if cannot create message from file content
+ */
public static void publish(String path) throws JMSException, MessageFactoryException
{
File tempFile = new File(path);
@@ -100,7 +110,7 @@
for (File file : files)
{
//Create message factory passing in payload path
- MessageFactory factory = new MessageFactory(getPublisher().getSession(), file.toString());
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString());
//Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
@@ -110,16 +120,18 @@
}
else
{
- //handle as single file
+ //handle a single file
//Create message factory passing in payload path
- MessageFactory factory = new MessageFactory(getPublisher().getSession(),tempFile.toString());
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString());
//Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
}
}
- //cleanup publishers
+ /**
+ * Cleanup before exit
+ */
public static void cleanup()
{
if (getPublisher() != null)
@@ -128,8 +140,8 @@
}
}
- /*
- * Returns a _publisher for a queue
+ /**
+ * @return A Publisher instance
*/
private static Publisher getPublisher()
{
@@ -141,7 +153,6 @@
//Create a _publisher
_publisher = new Publisher();
- _publisher.setName(DEFAULT_PUB_NAME);
return _publisher;
}
Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java?view=auto&rev=480089
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java Tue Nov 28 07:29:16 2006
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.example.shared.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+
+import java.io.*;
+import javax.jms.*;
+
+public class FileMessageFactory
+{
+ protected final Session _session;
+ protected final String _payload;
+ protected final String _filename;
+
+ /**
+ * Contructs and instance using a filename from which content will be used to create message
+ * @param session
+ * @param filename
+ * @throws MessageFactoryException
+ */
+ public FileMessageFactory(Session session, String filename) throws MessageFactoryException
+ {
+ try
+ {
+ _filename = filename;
+ _payload = FileUtils.getFileContent(filename);
+ _session = session;
+ }
+ catch (IOException e)
+ {
+ throw new MessageFactoryException(e.toString());
+ }
+ }
+
+ /**
+ * Creates a text message and sets filename property on it
+ * The filename property is purely intended to provide visibility
+ * of file content passing trhough the broker using example classes
+ * @return Message - a TextMessage with content from file
+ * @throws JMSException
+ */
+ public Message createEventMessage() throws JMSException
+ {
+ TextMessage msg = _session.createTextMessage();
+ msg.setText(_payload);
+ msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName());
+ return msg;
+ }
+
+ /**
+ * Creates message from a string for use by the monitor
+ * @param session
+ * @param textMsg - message content
+ * @return Message - TextMessage with content from String
+ * @throws JMSException
+ */
+ public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException
+ {
+ TextMessage msg = session.createTextMessage();
+ msg.setText(textMsg);
+ return msg;
+ }
+
+ public Message createShutdownMessage() throws JMSException
+ {
+ return _session.createTextMessage("SHUTDOWN");
+ }
+
+ public Message createReportRequestMessage() throws JMSException
+ {
+ return _session.createTextMessage("REPORT");
+ }
+
+ public Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ public boolean isShutdown(Message m)
+ {
+ return checkText(m, "SHUTDOWN");
+ }
+
+ public boolean isReport(Message m)
+ {
+ return checkText(m, "REPORT");
+ }
+
+ public Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return e.toString();
+ }
+ }
+
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return false;
+ }
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java?view=diff&rev=480089&r1=480088&r2=480089
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java Tue Nov 28 07:29:16 2006
@@ -20,8 +20,9 @@
import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
-import org.apache.qpid.example.shared.Statics;
+
import javax.jms.*;
+
import java.util.Properties;
/**
@@ -32,14 +33,18 @@
private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class);
- private static MonitorPublisher _monitorPublisher = null;
+ protected static MonitorPublisher _monitorPublisher = null;
- private static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher";
+ protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher";
+ /**
+ * Easy entry point for running a message dispatcher for monitoring consumption
+ * @param args
+ */
public static void main(String[] args)
{
- //@TODO switch on logging appropriately at your app level
+ //Switch on logging appropriately for your app
BasicConfigurator.configure();
try
@@ -61,7 +66,7 @@
}
catch(UndeliveredMessageException a)
{
- //@TODO trigger application specific failure handling here
+ //trigger application specific failure handling here
_logger.error("Problem delivering monitor message");
break;
}
@@ -69,8 +74,7 @@
}
catch(Exception e)
{
-
- System.err.println("Error trying to dispatch AMS monitor message: " + e);
+ _logger.error("Error trying to dispatch AMS monitor message: " + e);
System.exit(1);
}
finally
@@ -84,15 +88,21 @@
System.exit(1);
}
- //Publish heartbeat message
+ /**
+ * Publish heartbeat message
+ * @throws JMSException
+ * @throws UndeliveredMessageException
+ */
public static void publish() throws JMSException, UndeliveredMessageException
{
//Send the message generated from the payload using the _publisher
getMonitorPublisher().sendImmediateMessage
- (MessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+ (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
}
- //cleanup publishers
+ /**
+ * Cleanup publishers
+ */
public static void cleanup()
{
if (getMonitorPublisher() != null)
@@ -113,9 +123,6 @@
{
return _monitorPublisher;
}
-
- //Create _publisher using system properties
- Properties props = System.getProperties();
//Create a _publisher using failover details and constant for monitor queue
_monitorPublisher = new MonitorPublisher();
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java?view=diff&rev=480089&r1=480088&r2=480089
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java Tue Nov 28 07:29:16 2006
@@ -22,14 +22,14 @@
import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.jms.Session;
-
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.Connection;
+import javax.jms.Session;
+
import javax.naming.InitialContext;
import org.apache.qpid.example.shared.InitialContextHelper;
@@ -44,7 +44,7 @@
protected Session _session;
- private MessageProducer _producer;
+ protected MessageProducer _producer;
protected String _destinationDir;
@@ -54,7 +54,10 @@
protected static final String _defaultDestinationDir = "/tmp";
- //constructor for use with a single host
+ /**
+ * Creates a Publisher instance using properties from example.properties
+ * See InitialContextHelper for details of how context etc created
+ */
public Publisher()
{
try
@@ -68,7 +71,7 @@
_connection = cf.createConnection();
//create a transactional session
- _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//lookup the example queue and use it
//Queue is non-exclusive and not deleted when last consumer detaches
@@ -90,8 +93,9 @@
}
/**
- * Publishes a non-persistent message using transacted session
- **/
+ * Publishes a non-persistent message using transacted session
+ * Note that persistent is the default mode for send - so need to specify for transient
+ */
public boolean sendMessage(Message message)
{
try
@@ -124,6 +128,9 @@
return true;
}
+ /**
+ * Cleanup resources before exit
+ */
public void cleanup()
{
try
@@ -138,11 +145,15 @@
}
catch(Exception e)
{
- System.err.println("Error trying to cleanup publisher " + e);
+ _log.error("Error trying to cleanup publisher " + e);
System.exit(1);
}
}
+ /**
+ * Exposes session
+ * @return Session
+ */
public Session getSession()
{
return _session;