You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/12/01 18:04:56 UTC
svn commit: r722133 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae:
controller/ spi/transport/ spi/transport/vm/
Author: eae
Date: Mon Dec 1 09:04:55 2008
New Revision: 722133
URL: http://svn.apache.org/viewvc?rev=722133&view=rev
Log:
UIMA-1231 commit uimaj-as-activemq-UIMA-1231-patch.txt and uimaj-as-core-UIMA-1231-patch.txt
Added:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Mon Dec 1 09:04:55 2008
@@ -201,4 +201,5 @@
public InputChannel getReplyInputChannel(String aDelegateKey);
public LocalCache getLocalCache();
+ public void registerVmQueueWithJMX( Object o, String aName ) throws Exception;
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Dec 1 09:04:55 2008
@@ -488,6 +488,10 @@
parentListener.initialize(uimaAsContext2);
// Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
vmTransport.produceUimaMessageDispatcher(parentController,parentVmTransport);
+ // Register input queue with JMX. This is an internal (non-jms) queue where clients
+ // send requests to this service.
+ vmTransport.registerWithJMX(this, "VmInputQueue");
+ parentVmTransport.registerWithJMX( this, "VmReplyQueue");
}
}
@@ -652,6 +656,17 @@
}
}
}
+ public void registerVmQueueWithJMX( Object o, String aName ) throws Exception {
+ String jmxName = getManagementInterface().getJmxDomain()
+ +jmxContext+",name="+getComponentName()+"_"+aName;
+ registerWithAgent( o, jmxName);
+
+ if ( "VmReplyQueue".equals( aName )) {
+ getServiceInfo().setReplyQueueName(jmxName);
+ } else {
+ getServiceInfo().setInputQueueName(jmxName);
+ }
+ }
protected void registerServiceWithJMX(String key_value_list, boolean remote)
{
String thisComponentName = getComponentName();
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java Mon Dec 1 09:04:55 2008
@@ -90,4 +90,5 @@
public UimaVmMessage produceMessage();
public UimaVmMessage produceMessage(int aCommand, int aMessageType, String aMessageFrom);
+ public void registerWithJMX(AnalysisEngineController aController, String queueKind);
}
Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java?rev=722133&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java Mon Dec 1 09:04:55 2008
@@ -0,0 +1,70 @@
+package org.apache.uima.aae.spi.transport.vm;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This is a JMX wrapper around the {@link LinkedBlockingQueue}. It exposes
+ * the following queue statistics:
+ * <ul>
+ * <li>size - the number of items on the queue</li>
+ * <li>consumerCount - number of concurrent consuming threads taking items from this queue.</li>
+ * <li>dequeueCount - total number of items consumed so far </li>
+ * </ul>
+ *
+ */
+public class UimaVmQueue extends LinkedBlockingQueue<Runnable>
+implements UimaVmQueueMBean {
+ private static final long serialVersionUID = 1L;
+ private int consumerCount = 0;
+ private long dequeueCount = 0;
+
+ public UimaVmQueue( int size ) {
+ // super(size);
+ }
+
+ /**
+ * Returns the current number of items in the queue.
+ */
+ public int getQueueSize() {
+ return super.size();
+ }
+ /**
+ * Returns total number of items dequeued so far
+ */
+ public long getDequeueCount() {
+ return dequeueCount;
+ }
+ /**
+ * Override of the method in the super class to enable counting of items
+ * taken (dequeued) off the queue.
+ */
+ public Runnable take() throws InterruptedException {
+ Runnable work = super.take();
+ if ( work != null ) {
+ dequeueCount++;
+ }
+ return work;
+ }
+ /**
+ * Returns total number of concurrent threads consuming work from this
+ * queue.
+ */
+ public int getConsumerCount() {
+ return consumerCount;
+ }
+ /**
+ * Sets the number of concurrent threads consuming work from this queue
+ * @param aConsumerCount - number of consuming threads
+ */
+ public void setConsumerCount(int aConsumerCount) {
+ consumerCount = aConsumerCount;
+ }
+ /**
+ * Resets both the queue size and dequeue count to zero
+ */
+ public void reset() {
+ super.clear();
+ dequeueCount = 0;
+ }
+
+}
Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java?rev=722133&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java Mon Dec 1 09:04:55 2008
@@ -0,0 +1,12 @@
+package org.apache.uima.aae.spi.transport.vm;
+
+import java.io.Serializable;
+
+public interface UimaVmQueueMBean extends Serializable{
+
+ public int getQueueSize();
+ public int getConsumerCount();
+ public long getDequeueCount();
+ public void reset();
+
+}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java Mon Dec 1 09:04:55 2008
@@ -51,7 +51,9 @@
private ThreadPoolExecutor executor = null;
- private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+ // Create a queue for work items. The queue has a JMX wrapper to expose the
+ // size.
+ private BlockingQueue<Runnable> workQueue = null;
private VmTransport vmConnector;
@@ -122,6 +124,7 @@
protected ThreadPoolExecutor getExecutorInstance() {
if (executor == null) {
int concurrentConsumerCount = context.getConcurrentConsumerCount();
+ workQueue = new UimaVmQueue(concurrentConsumerCount);
// Create a ThreadPoolExecutor with as many threads as needed. The pool has
// a fixed number of threads that never expire and are never passivated.
executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount, Long.MAX_VALUE,
@@ -131,6 +134,16 @@
return executor;
}
+ public void registerWithJMX(AnalysisEngineController aController, String queueKind /* ReplyQueue or InputQueue */) {
+ try {
+ ((UimaVmQueue)workQueue).setConsumerCount(context.getConcurrentConsumerCount());
+ aController.registerVmQueueWithJMX(workQueue, queueKind);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+
+
+ }
public UimaMessageDispatcher getMessageDispatcher() throws UimaSpiException {
return dispatcher;
}