You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:25:21 UTC
svn commit: r810562 [2/2] - in
/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae:
jmx/monitor/ message/ monitor/ monitor/statistics/ spi/transport/
spi/transport/vm/
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java Wed Sep 2 15:25:17 2009
@@ -30,18 +30,18 @@
import org.apache.uima.util.Level;
/**
-
- * Uima message implementation of {@link UimaMessageDispatcher}. It uses a Java's Executor framework to
- * pass Uima messages to a collocated Uima AS service. Each message is processed in a seperate thread
- * provided by the Executor.
- *
- * */
+ *
+ * Uima message implementation of {@link UimaMessageDispatcher}. It uses a Java's Executor framework
+ * to pass Uima messages to a collocated Uima AS service. Each message is processed in a seperate
+ * thread provided by the Executor.
+ *
+ * */
public class UimaVmMessageDispatcher implements UimaMessageDispatcher {
private static final Class<?> CLASS_NAME = UimaVmMessageDispatcher.class;
private ThreadPoolExecutor executor = null;
- // Message listener which will receive a new message
+ // Message listener which will receive a new message
private final UimaMessageListener targetListener;
private String delegateKey;
@@ -52,14 +52,15 @@
delegateKey = aKey;
targetListener = aListener;
}
+
/**
- * This method is responsible for adding a Uima message to a queue which is shared with a
- * collocated service. Each message is processed by the receiving service in a thread
- * provided by the Executor.
+ * This method is responsible for adding a Uima message to a queue which is shared with a
+ * collocated service. Each message is processed by the receiving service in a thread provided by
+ * the Executor.
*/
public void dispatch(final UimaMessage message) {
- if ( executor.isShutdown() || executor.isTerminating() || executor.isShutdown() ) {
- return;
+ if (executor.isShutdown() || executor.isTerminating() || executor.isShutdown()) {
+ return;
}
executor.execute(new Runnable() {
public void run() {
@@ -73,14 +74,17 @@
} catch (Exception e) {
e.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", new Object[] { e });
}
}
}
});
}
+
public void stop() {
- if ( executor != null ) {
+ if (executor != null) {
executor.purge();
executor.shutdownNow();
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java Wed Sep 2 15:25:17 2009
@@ -73,26 +73,28 @@
int requestType = 0;
try {
latch.await();
- if ( controller != null && controller.isStopped()) {
+ if (controller != null && controller.isStopped()) {
return; // throw away the message, we are stopping
}
if (UimaMessageValidator.isValidMessage(aMessage, controller)) {
MessageContext msgContext = aMessage.toMessageContext(controller.getName());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_msg_recvd__FINEST",
- new Object[] { controller.getComponentName(), aMessage.toString() });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_new_msg_recvd__FINEST",
+ new Object[] { controller.getComponentName(), aMessage.toString() });
}
if (!concurrentThreads.containsKey(Thread.currentThread().getId())) {
Thread.currentThread().setName(
- Thread.currentThread().getName() + "::" + controller.getComponentName()+ "::"+Thread.currentThread().getId());
+ Thread.currentThread().getName() + "::" + controller.getComponentName() + "::"
+ + Thread.currentThread().getId());
// Store the thread identifier in the map. The value stored is not important. All
// we want is to save the fact that the thread name has been changed. And we only
// want to change it once
concurrentThreads.put(Thread.currentThread().getId(), Thread.currentThread().getName());
}
requestType = aMessage.getIntProperty(AsynchAEMessage.Command);
- if ( requestType == AsynchAEMessage.Stop ) {
+ if (requestType == AsynchAEMessage.Stop) {
return;
}
// Determine if this message is a request and either GetMeta, CPC, or Process
@@ -101,17 +103,17 @@
if (doCheckpoint) {
controller.beginProcess(requestType);
}
- // Process the message.
+ // Process the message.
handler.handle(msgContext);
}
- } catch( InterruptedException e) {
+ } catch (InterruptedException e) {
System.out.println("VMTransport Latch Interrupted - Processor is Stopping");
} catch (Exception e) {
e.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
- "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", new Object[] { e });
+ "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", new Object[] { e });
}
} finally {
// Call the end checkpoint for non-aggregates. For primitives the CAS has been fully processed
@@ -121,6 +123,7 @@
}
}
}
+
/**
* Initializes this listener. Instantiates and links message handlers. O
*/
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java Wed Sep 2 15:25:17 2009
@@ -3,62 +3,69 @@
import java.util.concurrent.LinkedBlockingQueue;
/**
- * This is a JMX wrapper around the {@link LinkedBlockingQueue}. It exposes
- * the following queue statistics:
+ * This is a JMX wrapper around the {@link LinkedBlockingQueue}. It exposes the following queue
+ * statistics:
* <ul>
- * <li>size - the number of items on the queue</li>
- * <li>consumerCount - number of concurrent consuming threads taking items from this queue.</li>
- * <li>dequeueCount - total number of items consumed so far </li>
+ * <li>size - the number of items on the queue</li>
+ * <li>consumerCount - number of concurrent consuming threads taking items from this queue.</li>
+ * <li>dequeueCount - total number of items consumed so far</li>
* </ul>
- *
+ *
*/
-public class UimaVmQueue extends LinkedBlockingQueue<Runnable>
-implements UimaVmQueueMBean {
+public class UimaVmQueue extends LinkedBlockingQueue<Runnable> implements UimaVmQueueMBean {
private static final long serialVersionUID = 1L;
+
private int consumerCount = 0;
+
private long dequeueCount = 0;
-
- public UimaVmQueue( int size ) {
- // super(size);
+
+ public UimaVmQueue(int size) {
+ // super(size);
}
-
+
/**
* Returns the current number of items in the queue.
- */
+ */
public int getQueueSize() {
return super.size();
}
+
/**
* Returns total number of items dequeued so far
*/
public long getDequeueCount() {
return dequeueCount;
}
+
/**
- * Override of the method in the super class to enable counting of items
- * taken (dequeued) off the queue.
+ * Override of the method in the super class to enable counting of items taken (dequeued) off the
+ * queue.
*/
public Runnable take() throws InterruptedException {
- Runnable work = super.take();
- if ( work != null ) {
+ Runnable work = super.take();
+ if (work != null) {
dequeueCount++;
}
return work;
}
+
/**
- * Returns total number of concurrent threads consuming work from this
- * queue.
+ * Returns total number of concurrent threads consuming work from this queue.
*/
public int getConsumerCount() {
return consumerCount;
}
+
/**
* Sets the number of concurrent threads consuming work from this queue
- * @param aConsumerCount - number of consuming threads
+ *
+ * @param aConsumerCount
+ * - number of consuming threads
*/
public void setConsumerCount(int aConsumerCount) {
consumerCount = aConsumerCount;
}
+
/**
* Resets both the queue size and dequeue count to zero
*/
@@ -66,5 +73,5 @@
super.clear();
dequeueCount = 0;
}
-
+
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java Wed Sep 2 15:25:17 2009
@@ -2,11 +2,14 @@
import java.io.Serializable;
-public interface UimaVmQueueMBean extends Serializable{
-
+public interface UimaVmQueueMBean extends Serializable {
+
public int getQueueSize();
+
public int getConsumerCount();
+
public long getDequeueCount();
+
public void reset();
-
+
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java Wed Sep 2 15:25:17 2009
@@ -50,10 +50,10 @@
import org.apache.uima.util.Level;
/**
- * This class provides implementation for internal messaging between collocated Uima AS services.
- * It uses {@link UimaMessageDispatcher} to send messages to {@link UimaMessageListener}.
+ * This class provides implementation for internal messaging between collocated Uima AS services. It
+ * uses {@link UimaMessageDispatcher} to send messages to {@link UimaMessageListener}.
+ *
*
- *
*/
public class VmTransport implements UimaTransport {
private static final Class CLASS_NAME = VmTransport.class;
@@ -65,8 +65,9 @@
private ThreadPoolExecutor executor = null;
private ThreadGroup threadGroup = null;
- // Create a queue for work items. The queue has a JMX wrapper to expose the
- // size.
+
+ // Create a queue for work items. The queue has a JMX wrapper to expose the
+ // size.
private BlockingQueue<Runnable> workQueue = null;
private UimaVmMessageDispatcher dispatcher;
@@ -76,19 +77,21 @@
private AnalysisEngineController controller;
private UimaAsContext context;
-
+
private AtomicBoolean stopping = new AtomicBoolean(false);
public VmTransport(UimaAsContext aContext, AnalysisEngineController aController) {
context = aContext;
UIDGenerator idGenerator = new UIDGenerator();
controller = aController;
- threadGroup = new ThreadGroup("VmThreadGroup"+idGenerator.nextId()+"_"+controller.getComponentName());
+ threadGroup = new ThreadGroup("VmThreadGroup" + idGenerator.nextId() + "_"
+ + controller.getComponentName());
}
public void addSpiListener(SpiListener listener) {
spiListeners.add(listener);
}
+
public UimaVmMessage produceMessage() {
return new UimaVmMessage();
}
@@ -119,50 +122,51 @@
}
public synchronized void stopIt() throws UimaSpiException {
- if ( stopping.get() == true ) {
+ if (stopping.get() == true) {
return;
}
stopping.set(true);
executor.purge();
executor.shutdownNow();
workQueue.clear();
- Set <Entry<String, UimaVmMessageDispatcher>> set = dispatchers.entrySet();
- for( Entry<String, UimaVmMessageDispatcher> entry: set) {
+ Set<Entry<String, UimaVmMessageDispatcher>> set = dispatchers.entrySet();
+ for (Entry<String, UimaVmMessageDispatcher> entry : set) {
UimaVmMessageDispatcher dispatcher = entry.getValue();
dispatcher.stop();
}
- if ( threadGroup != null ) {
- // Spin a thread where we wait for threads in the threadGroup to stop.
- new Thread(threadGroup.getParent(),threadGroup.getName()+":Reaper") {
- public void run() {
-
- while ( threadGroup.activeCount() > 0) {
- synchronized(this) {
- try {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- threadGroup.list();
- }
- wait(1000);
- } catch( InterruptedException ex) {
-
+ if (threadGroup != null) {
+ // Spin a thread where we wait for threads in the threadGroup to stop.
+ new Thread(threadGroup.getParent(), threadGroup.getName() + ":Reaper") {
+ public void run() {
+
+ while (threadGroup.activeCount() > 0) {
+ synchronized (this) {
+ try {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ threadGroup.list();
}
+ wait(1000);
+ } catch (InterruptedException ex) {
+
}
}
- try {
- threadGroup.destroy();
- } catch ( Exception e) {
- } finally {
- threadGroup = null;
- }
}
- }.start();
+ try {
+ threadGroup.destroy();
+ } catch (Exception e) {
+ } finally {
+ threadGroup = null;
+ }
+ }
+ }.start();
}
}
+
public void destroy() {
try {
stopIt();
- } catch ( Exception e) {
-
+ } catch (Exception e) {
+
}
}
@@ -170,12 +174,13 @@
if (executor == null) {
int concurrentConsumerCount = context.getConcurrentConsumerCount();
workQueue = new UimaVmQueue(concurrentConsumerCount);
- // Create a ThreadPoolExecutor with as many threads as needed. The pool has
+ // Create a ThreadPoolExecutor with as many threads as needed. The pool has
// a fixed number of threads that never expire and are never passivated.
- executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount, Long.MAX_VALUE,
- TimeUnit.NANOSECONDS, workQueue);
- if ( controller instanceof PrimitiveAnalysisEngineController ) {
- ThreadFactory tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController)controller);
+ executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount,
+ Long.MAX_VALUE, TimeUnit.NANOSECONDS, workQueue);
+ if (controller instanceof PrimitiveAnalysisEngineController) {
+ ThreadFactory tf = new UimaAsThreadFactory(threadGroup,
+ (PrimitiveAnalysisEngineController) controller);
executor.setThreadFactory(tf);
executor.prestartAllCoreThreads();
}
@@ -183,16 +188,20 @@
return executor;
}
- public void registerWithJMX(AnalysisEngineController aController, String queueKind /* ReplyQueue or InputQueue */) {
+ public void registerWithJMX(AnalysisEngineController aController, String queueKind /*
+ * ReplyQueue
+ * or
+ * InputQueue
+ */) {
try {
- ((UimaVmQueue)workQueue).setConsumerCount(context.getConcurrentConsumerCount());
+ ((UimaVmQueue) workQueue).setConsumerCount(context.getConcurrentConsumerCount());
aController.registerVmQueueWithJMX(workQueue, queueKind);
- } catch( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
-
}
+
public UimaMessageDispatcher getMessageDispatcher() throws UimaSpiException {
return dispatcher;
}
@@ -201,8 +210,7 @@
return listener;
}
- public UimaMessageListener produceUimaMessageListener()
- throws UimaSpiException {
+ public UimaMessageListener produceUimaMessageListener() throws UimaSpiException {
listener = new UimaVmMessageListener(controller);
return listener;
}
@@ -214,18 +222,19 @@
public UimaMessageDispatcher getUimaMessageDispatcher(String aKey) throws UimaSpiException {
return dispatchers.get(aKey);
}
- public UimaVmMessageDispatcher produceUimaMessageDispatcher(UimaTransport aTransport) throws UimaSpiException {
+
+ public UimaVmMessageDispatcher produceUimaMessageDispatcher(UimaTransport aTransport)
+ throws UimaSpiException {
UimaVmMessageDispatcher dispatcher = null;
String endpointName = (String) context.get("EndpointName");
- if (!controller.isTopLevelComponent() ) {
+ if (!controller.isTopLevelComponent()) {
endpointName = controller.getParentController().getName();
}
dispatcher = new UimaVmMessageDispatcher(((VmTransport) aTransport).getExecutorInstance(),
((VmTransport) aTransport).getUimaMessageListener(), endpointName);
- if ( controller instanceof AggregateAnalysisEngineController ||
- ((VmTransport) aTransport).controller instanceof AggregateAnalysisEngineController
- ) {
- // Store the dispatcher under delegate's name
+ if (controller instanceof AggregateAnalysisEngineController
+ || ((VmTransport) aTransport).controller instanceof AggregateAnalysisEngineController) {
+ // Store the dispatcher under delegate's name
dispatchers.put(((VmTransport) aTransport).controller.getName(), dispatcher);
} else {
dispatchers.put(controller.getName(), dispatcher);