You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/12/01 18:04:56 UTC

svn commit: r722133 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae: controller/ spi/transport/ spi/transport/vm/

Author: eae
Date: Mon Dec  1 09:04:55 2008
New Revision: 722133

URL: http://svn.apache.org/viewvc?rev=722133&view=rev
Log:
UIMA-1231 commit uimaj-as-activemq-UIMA-1231-patch.txt and uimaj-as-core-UIMA-1231-patch.txt

Added:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java
Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Mon Dec  1 09:04:55 2008
@@ -201,4 +201,5 @@
   public InputChannel getReplyInputChannel(String aDelegateKey);
   
   public LocalCache getLocalCache();
+  public void registerVmQueueWithJMX( Object o, String aName ) throws Exception; 
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Dec  1 09:04:55 2008
@@ -488,6 +488,10 @@
        parentListener.initialize(uimaAsContext2);
        // Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
        vmTransport.produceUimaMessageDispatcher(parentController,parentVmTransport);
+       // Register input queue with JMX. This is an internal (non-jms) queue where clients
+       // send requests to this service.
+       vmTransport.registerWithJMX(this, "VmInputQueue");
+       parentVmTransport.registerWithJMX( this, "VmReplyQueue");
      }
 
 	 }
@@ -652,6 +656,17 @@
       }
 		}
 	}
+	public void registerVmQueueWithJMX( Object o, String aName ) throws Exception {
+	  String jmxName = getManagementInterface().getJmxDomain()
+            +jmxContext+",name="+getComponentName()+"_"+aName;
+	  registerWithAgent( o, jmxName);
+	  
+	  if ( "VmReplyQueue".equals( aName )) {
+	    getServiceInfo().setReplyQueueName(jmxName);
+	  } else {
+	    getServiceInfo().setInputQueueName(jmxName);
+	  }
+	}
 	protected void registerServiceWithJMX(String key_value_list, boolean remote) 
 	{
 		String thisComponentName = getComponentName();

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/UimaTransport.java Mon Dec  1 09:04:55 2008
@@ -90,4 +90,5 @@
   public UimaVmMessage produceMessage();
 
   public UimaVmMessage produceMessage(int aCommand, int aMessageType, String aMessageFrom);
+  public void registerWithJMX(AnalysisEngineController aController, String queueKind);
 }

Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java?rev=722133&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java Mon Dec  1 09:04:55 2008
@@ -0,0 +1,70 @@
+package org.apache.uima.aae.spi.transport.vm;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This is a JMX wrapper around the {@link LinkedBlockingQueue}. It exposes
+ * the following queue statistics:
+ * <ul>
+ *    <li>size - the number of items on the queue</li>
+ *    <li>consumerCount - number of concurrent consuming threads taking items from this queue.</li>
+ *    <li>dequeueCount - total number of items consumed so far </li>   
+ * </ul>
+ *
+ */
+public class UimaVmQueue extends LinkedBlockingQueue<Runnable> 
+implements UimaVmQueueMBean {
+  private static final long serialVersionUID = 1L;
+  private int consumerCount = 0;
+  private long dequeueCount = 0;
+  
+  public UimaVmQueue( int size ) {
+   // super(size);
+  }
+ 
+  /**
+   * Returns the current number of items in the queue.
+   */  
+  public int getQueueSize() {
+    return super.size();
+  }
+  /**
+   * Returns total number of items dequeued so far
+   */
+  public long getDequeueCount() {
+    return dequeueCount;
+  }
+  /**
+   * Override of the method in the super class to enable counting of items
+   * taken (dequeued) off the queue.
+   */
+  public Runnable take() throws InterruptedException {
+    Runnable work =  super.take();
+    if ( work != null ) {
+      dequeueCount++;
+    }
+    return work;
+  }
+  /**
+   * Returns total number of concurrent threads consuming work from this
+   * queue.  
+   */
+  public int getConsumerCount() {
+    return consumerCount;
+  }
+  /**
+   * Sets the number of concurrent threads consuming work from this queue
+   * @param aConsumerCount - number of consuming threads
+   */
+  public void setConsumerCount(int aConsumerCount) {
+    consumerCount = aConsumerCount;
+  }
+  /**
+   * Resets both the queue size and dequeue count to zero
+   */
+  public void reset() {
+    super.clear();
+    dequeueCount = 0;
+  }
+  
+}

Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java?rev=722133&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java Mon Dec  1 09:04:55 2008
@@ -0,0 +1,12 @@
+package org.apache.uima.aae.spi.transport.vm;
+
+import java.io.Serializable;
+
+public interface UimaVmQueueMBean extends Serializable{
+  
+  public int getQueueSize();
+  public int getConsumerCount();
+  public long getDequeueCount();
+  public void reset();
+  
+}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java?rev=722133&r1=722132&r2=722133&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java Mon Dec  1 09:04:55 2008
@@ -51,7 +51,9 @@
 
   private ThreadPoolExecutor executor = null;
 
-  private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+  //  Create a queue for work items. The queue has a JMX wrapper to expose the 
+  //  size.
+  private BlockingQueue<Runnable> workQueue = null;
 
   private VmTransport vmConnector;
 
@@ -122,6 +124,7 @@
   protected ThreadPoolExecutor getExecutorInstance() {
     if (executor == null) {
       int concurrentConsumerCount = context.getConcurrentConsumerCount();
+      workQueue = new UimaVmQueue(concurrentConsumerCount);
       // Create a ThreadPoolExecutor with as many threads as needed. The pool has 
       // a fixed number of threads that never expire and are never passivated.
       executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount, Long.MAX_VALUE,
@@ -131,6 +134,16 @@
     return executor;
   }
 
+  public void registerWithJMX(AnalysisEngineController aController, String queueKind /* ReplyQueue or InputQueue */) {
+    try {
+      ((UimaVmQueue)workQueue).setConsumerCount(context.getConcurrentConsumerCount());
+      aController.registerVmQueueWithJMX(workQueue, queueKind);
+    } catch( Exception e) {
+      e.printStackTrace();
+    }
+
+
+  }
   public UimaMessageDispatcher getMessageDispatcher() throws UimaSpiException {
     return dispatcher;
   }