You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:25:21 UTC

svn commit: r810562 [2/2] - in /incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae: jmx/monitor/ message/ monitor/ monitor/statistics/ spi/transport/ spi/transport/vm/

Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageDispatcher.java Wed Sep  2 15:25:17 2009
@@ -30,18 +30,18 @@
 import org.apache.uima.util.Level;
 
 /**
-
- * Uima message implementation of {@link UimaMessageDispatcher}. It uses a Java's Executor framework to
- * pass Uima messages to a collocated Uima AS service. Each message is processed in a seperate thread
- * provided by the Executor.
- *  
- *  */
+ * 
+ * Uima message implementation of {@link UimaMessageDispatcher}. It uses a Java's Executor framework
+ * to pass Uima messages to a collocated Uima AS service. Each message is processed in a seperate
+ * thread provided by the Executor.
+ * 
+ * */
 public class UimaVmMessageDispatcher implements UimaMessageDispatcher {
   private static final Class<?> CLASS_NAME = UimaVmMessageDispatcher.class;
 
   private ThreadPoolExecutor executor = null;
 
-  //  Message listener which will receive a new message
+  // Message listener which will receive a new message
   private final UimaMessageListener targetListener;
 
   private String delegateKey;
@@ -52,14 +52,15 @@
     delegateKey = aKey;
     targetListener = aListener;
   }
+
   /**
-   * This method is responsible for adding a Uima message to a queue which is shared with a 
-   * collocated service. Each message is processed by the receiving service in a thread 
-   * provided by the Executor.
+   * This method is responsible for adding a Uima message to a queue which is shared with a
+   * collocated service. Each message is processed by the receiving service in a thread provided by
+   * the Executor.
    */
   public void dispatch(final UimaMessage message) {
-    if ( executor.isShutdown() || executor.isTerminating() || executor.isShutdown() ) {
-      return; 
+    if (executor.isShutdown() || executor.isTerminating() || executor.isShutdown()) {
+      return;
     }
     executor.execute(new Runnable() {
       public void run() {
@@ -73,14 +74,17 @@
         } catch (Exception e) {
           e.printStackTrace();
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                    "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAEE_exception__WARNING", new Object[] { e });
           }
         }
       }
     });
   }
+
   public void stop() {
-    if ( executor != null ) {
+    if (executor != null) {
       executor.purge();
       executor.shutdownNow();
     }

Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java Wed Sep  2 15:25:17 2009
@@ -73,26 +73,28 @@
     int requestType = 0;
     try {
       latch.await();
-      if ( controller != null && controller.isStopped()) {
+      if (controller != null && controller.isStopped()) {
         return; // throw away the message, we are stopping
       }
       if (UimaMessageValidator.isValidMessage(aMessage, controller)) {
         MessageContext msgContext = aMessage.toMessageContext(controller.getName());
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
-                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_msg_recvd__FINEST",
-                new Object[] { controller.getComponentName(), aMessage.toString() });
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+                  "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_new_msg_recvd__FINEST",
+                  new Object[] { controller.getComponentName(), aMessage.toString() });
         }
         if (!concurrentThreads.containsKey(Thread.currentThread().getId())) {
           Thread.currentThread().setName(
-                  Thread.currentThread().getName() + "::" + controller.getComponentName()+ "::"+Thread.currentThread().getId());
+                  Thread.currentThread().getName() + "::" + controller.getComponentName() + "::"
+                          + Thread.currentThread().getId());
           // Store the thread identifier in the map. The value stored is not important. All
           // we want is to save the fact that the thread name has been changed. And we only
           // want to change it once
           concurrentThreads.put(Thread.currentThread().getId(), Thread.currentThread().getName());
         }
         requestType = aMessage.getIntProperty(AsynchAEMessage.Command);
-        if ( requestType == AsynchAEMessage.Stop ) {
+        if (requestType == AsynchAEMessage.Stop) {
           return;
         }
         // Determine if this message is a request and either GetMeta, CPC, or Process
@@ -101,17 +103,17 @@
         if (doCheckpoint) {
           controller.beginProcess(requestType);
         }
-        //  Process the message.
+        // Process the message.
         handler.handle(msgContext);
       }
-    } catch( InterruptedException e) {
+    } catch (InterruptedException e) {
       System.out.println("VMTransport Latch Interrupted - Processor is Stopping");
     } catch (Exception e) {
       e.printStackTrace();
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-              "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-              "UIMAEE_exception__WARNING", new Object[] { e });
+                "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_exception__WARNING", new Object[] { e });
       }
     } finally {
       // Call the end checkpoint for non-aggregates. For primitives the CAS has been fully processed
@@ -121,6 +123,7 @@
       }
     }
   }
+
   /**
    * Initializes this listener. Instantiates and links message handlers. O
    */

Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueue.java Wed Sep  2 15:25:17 2009
@@ -3,62 +3,69 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * This is a JMX wrapper around the {@link LinkedBlockingQueue}. It exposes
- * the following queue statistics:
+ * This is a JMX wrapper around the {@link LinkedBlockingQueue}. It exposes the following queue
+ * statistics:
  * <ul>
- *    <li>size - the number of items on the queue</li>
- *    <li>consumerCount - number of concurrent consuming threads taking items from this queue.</li>
- *    <li>dequeueCount - total number of items consumed so far </li>   
+ * <li>size - the number of items on the queue</li>
+ * <li>consumerCount - number of concurrent consuming threads taking items from this queue.</li>
+ * <li>dequeueCount - total number of items consumed so far</li>
  * </ul>
- *
+ * 
  */
-public class UimaVmQueue extends LinkedBlockingQueue<Runnable> 
-implements UimaVmQueueMBean {
+public class UimaVmQueue extends LinkedBlockingQueue<Runnable> implements UimaVmQueueMBean {
   private static final long serialVersionUID = 1L;
+
   private int consumerCount = 0;
+
   private long dequeueCount = 0;
-  
-  public UimaVmQueue( int size ) {
-   // super(size);
+
+  public UimaVmQueue(int size) {
+    // super(size);
   }
- 
+
   /**
    * Returns the current number of items in the queue.
-   */  
+   */
   public int getQueueSize() {
     return super.size();
   }
+
   /**
    * Returns total number of items dequeued so far
    */
   public long getDequeueCount() {
     return dequeueCount;
   }
+
   /**
-   * Override of the method in the super class to enable counting of items
-   * taken (dequeued) off the queue.
+   * Override of the method in the super class to enable counting of items taken (dequeued) off the
+   * queue.
    */
   public Runnable take() throws InterruptedException {
-    Runnable work =  super.take();
-    if ( work != null ) {
+    Runnable work = super.take();
+    if (work != null) {
       dequeueCount++;
     }
     return work;
   }
+
   /**
-   * Returns total number of concurrent threads consuming work from this
-   * queue.  
+   * Returns total number of concurrent threads consuming work from this queue.
    */
   public int getConsumerCount() {
     return consumerCount;
   }
+
   /**
    * Sets the number of concurrent threads consuming work from this queue
-   * @param aConsumerCount - number of consuming threads
+   * 
+   * @param aConsumerCount
+   *          - number of consuming threads
    */
   public void setConsumerCount(int aConsumerCount) {
     consumerCount = aConsumerCount;
   }
+
   /**
    * Resets both the queue size and dequeue count to zero
    */
@@ -66,5 +73,5 @@
     super.clear();
     dequeueCount = 0;
   }
-  
+
 }

Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmQueueMBean.java Wed Sep  2 15:25:17 2009
@@ -2,11 +2,14 @@
 
 import java.io.Serializable;
 
-public interface UimaVmQueueMBean extends Serializable{
-  
+public interface UimaVmQueueMBean extends Serializable {
+
   public int getQueueSize();
+
   public int getConsumerCount();
+
   public long getDequeueCount();
+
   public void reset();
-  
+
 }

Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java?rev=810562&r1=810561&r2=810562&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java Wed Sep  2 15:25:17 2009
@@ -50,10 +50,10 @@
 import org.apache.uima.util.Level;
 
 /**
- * This class provides implementation for internal messaging between collocated Uima AS services.
- * It uses {@link UimaMessageDispatcher} to send messages to {@link UimaMessageListener}.
+ * This class provides implementation for internal messaging between collocated Uima AS services. It
+ * uses {@link UimaMessageDispatcher} to send messages to {@link UimaMessageListener}.
+ * 
  * 
- *
  */
 public class VmTransport implements UimaTransport {
   private static final Class CLASS_NAME = VmTransport.class;
@@ -65,8 +65,9 @@
   private ThreadPoolExecutor executor = null;
 
   private ThreadGroup threadGroup = null;
-  //  Create a queue for work items. The queue has a JMX wrapper to expose the 
-  //  size.
+
+  // Create a queue for work items. The queue has a JMX wrapper to expose the
+  // size.
   private BlockingQueue<Runnable> workQueue = null;
 
   private UimaVmMessageDispatcher dispatcher;
@@ -76,19 +77,21 @@
   private AnalysisEngineController controller;
 
   private UimaAsContext context;
-  
+
   private AtomicBoolean stopping = new AtomicBoolean(false);
 
   public VmTransport(UimaAsContext aContext, AnalysisEngineController aController) {
     context = aContext;
     UIDGenerator idGenerator = new UIDGenerator();
     controller = aController;
-    threadGroup = new ThreadGroup("VmThreadGroup"+idGenerator.nextId()+"_"+controller.getComponentName());
+    threadGroup = new ThreadGroup("VmThreadGroup" + idGenerator.nextId() + "_"
+            + controller.getComponentName());
   }
 
   public void addSpiListener(SpiListener listener) {
     spiListeners.add(listener);
   }
+
   public UimaVmMessage produceMessage() {
     return new UimaVmMessage();
   }
@@ -119,50 +122,51 @@
   }
 
   public synchronized void stopIt() throws UimaSpiException {
-    if ( stopping.get() == true ) {
+    if (stopping.get() == true) {
       return;
     }
     stopping.set(true);
     executor.purge();
     executor.shutdownNow();
     workQueue.clear();
-    Set <Entry<String, UimaVmMessageDispatcher>> set = dispatchers.entrySet();
-    for( Entry<String, UimaVmMessageDispatcher> entry: set) {
+    Set<Entry<String, UimaVmMessageDispatcher>> set = dispatchers.entrySet();
+    for (Entry<String, UimaVmMessageDispatcher> entry : set) {
       UimaVmMessageDispatcher dispatcher = entry.getValue();
       dispatcher.stop();
     }
-    if ( threadGroup != null ) {
-      //  Spin a thread where we wait for threads in the threadGroup to stop.
-      new Thread(threadGroup.getParent(),threadGroup.getName()+":Reaper") {
-          public void run() {
-            
-            while ( threadGroup.activeCount() > 0) {
-              synchronized(this) {
-                try {
-                  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-                    threadGroup.list();
-                  }
-                  wait(1000);
-                } catch( InterruptedException ex) {
-                  
+    if (threadGroup != null) {
+      // Spin a thread where we wait for threads in the threadGroup to stop.
+      new Thread(threadGroup.getParent(), threadGroup.getName() + ":Reaper") {
+        public void run() {
+
+          while (threadGroup.activeCount() > 0) {
+            synchronized (this) {
+              try {
+                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                  threadGroup.list();
                 }
+                wait(1000);
+              } catch (InterruptedException ex) {
+
               }
             }
-            try {
-              threadGroup.destroy();
-            } catch ( Exception e) {
-            } finally {
-              threadGroup = null;
-            }
           }
-        }.start();
+          try {
+            threadGroup.destroy();
+          } catch (Exception e) {
+          } finally {
+            threadGroup = null;
+          }
+        }
+      }.start();
     }
   }
+
   public void destroy() {
     try {
       stopIt();
-    } catch ( Exception e) {
-      
+    } catch (Exception e) {
+
     }
   }
 
@@ -170,12 +174,13 @@
     if (executor == null) {
       int concurrentConsumerCount = context.getConcurrentConsumerCount();
       workQueue = new UimaVmQueue(concurrentConsumerCount);
-      // Create a ThreadPoolExecutor with as many threads as needed. The pool has 
+      // Create a ThreadPoolExecutor with as many threads as needed. The pool has
       // a fixed number of threads that never expire and are never passivated.
-      executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount, Long.MAX_VALUE,
-              TimeUnit.NANOSECONDS, workQueue);
-      if ( controller instanceof PrimitiveAnalysisEngineController ) {
-        ThreadFactory tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController)controller);
+      executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount,
+              Long.MAX_VALUE, TimeUnit.NANOSECONDS, workQueue);
+      if (controller instanceof PrimitiveAnalysisEngineController) {
+        ThreadFactory tf = new UimaAsThreadFactory(threadGroup,
+                (PrimitiveAnalysisEngineController) controller);
         executor.setThreadFactory(tf);
         executor.prestartAllCoreThreads();
       }
@@ -183,16 +188,20 @@
     return executor;
   }
 
-  public void registerWithJMX(AnalysisEngineController aController, String queueKind /* ReplyQueue or InputQueue */) {
+  public void registerWithJMX(AnalysisEngineController aController, String queueKind /*
+                                                                                      * ReplyQueue
+                                                                                      * or
+                                                                                      * InputQueue
+                                                                                      */) {
     try {
-      ((UimaVmQueue)workQueue).setConsumerCount(context.getConcurrentConsumerCount());
+      ((UimaVmQueue) workQueue).setConsumerCount(context.getConcurrentConsumerCount());
       aController.registerVmQueueWithJMX(workQueue, queueKind);
-    } catch( Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
     }
 
-
   }
+
   public UimaMessageDispatcher getMessageDispatcher() throws UimaSpiException {
     return dispatcher;
   }
@@ -201,8 +210,7 @@
     return listener;
   }
 
-  public UimaMessageListener produceUimaMessageListener()
-          throws UimaSpiException {
+  public UimaMessageListener produceUimaMessageListener() throws UimaSpiException {
     listener = new UimaVmMessageListener(controller);
     return listener;
   }
@@ -214,18 +222,19 @@
   public UimaMessageDispatcher getUimaMessageDispatcher(String aKey) throws UimaSpiException {
     return dispatchers.get(aKey);
   }
-  public UimaVmMessageDispatcher produceUimaMessageDispatcher(UimaTransport aTransport) throws UimaSpiException {
+
+  public UimaVmMessageDispatcher produceUimaMessageDispatcher(UimaTransport aTransport)
+          throws UimaSpiException {
     UimaVmMessageDispatcher dispatcher = null;
     String endpointName = (String) context.get("EndpointName");
-    if (!controller.isTopLevelComponent() ) {
+    if (!controller.isTopLevelComponent()) {
       endpointName = controller.getParentController().getName();
     }
     dispatcher = new UimaVmMessageDispatcher(((VmTransport) aTransport).getExecutorInstance(),
             ((VmTransport) aTransport).getUimaMessageListener(), endpointName);
-    if ( controller instanceof AggregateAnalysisEngineController ||
-            ((VmTransport) aTransport).controller instanceof AggregateAnalysisEngineController   
-       ) {
-      //  Store the dispatcher under delegate's name
+    if (controller instanceof AggregateAnalysisEngineController
+            || ((VmTransport) aTransport).controller instanceof AggregateAnalysisEngineController) {
+      // Store the dispatcher under delegate's name
       dispatchers.put(((VmTransport) aTransport).controller.getName(), dispatcher);
     } else {
       dispatchers.put(controller.getName(), dispatcher);