You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ctakes.apache.org by se...@apache.org on 2023/05/16 16:31:24 UTC

[ctakes] branch main updated: Annotation Engine wrapper for the PbjReceiver collection reader. Example piper that has all of the ctakes steps in 1 pipeline

This is an automated email from the ASF dual-hosted git repository.

seanfinan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ctakes.git


The following commit(s) were added to refs/heads/main by this push:
     new d0df59d  Annotation Engine wrapper for the PbjReceiver collection reader. Example piper that has all of the ctakes steps in 1 pipeline
d0df59d is described below

commit d0df59dd342bca1be6aa465f6cea96604ce770c1
Author: Sean Finan <se...@childrens.harvard.edu>
AuthorDate: Tue May 16 12:31:10 2023 -0400

    Annotation Engine wrapper for the PbjReceiver collection reader.
    Example piper that has all of the ctakes steps in 1 pipeline
---
 .../ctakes/examples/pipeline/WordFinder.piper      |   2 +-
 .../{WordFinder.piper => WordFinderInOne.piper}    |  33 +++---
 .../org/apache/ctakes/pbj/ae/PbjReceiverAE.java    | 121 +++++++++++++++++++++
 .../java/org/apache/ctakes/pbj/cr/PbjReceiver.java |  30 ++++-
 4 files changed, 168 insertions(+), 18 deletions(-)

diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
index d6e4a3d..b6ec6ac 100644
--- a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
@@ -58,4 +58,4 @@ load DefaultTokenizerPipeline
 
 // Send CAS to Artemis at the specified queue.  Send stop signal when processing has finished.
 add PbjJmsSender SendQueue=JavaToPy SendStop=yes
-//add PbjStompSender SendQueue=JavaToPy SendStop=yes
+
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinderInOne.piper
similarity index 71%
copy from ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
copy to ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinderInOne.piper
index d6e4a3d..6539b03 100644
--- a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinderInOne.piper
@@ -4,9 +4,6 @@
 #  This piper will start the Apache Artemis broker pointed to by the -a parameter on the command line.
 #  It will pause for 5 seconds to allow Artemis to fully launch.
 #
-#  This piper will then launch another instance of Apache cTAKES.
-#  That instance of cTAKES will run the third and final bit of the entire PBJ pipeline.
-#
 #  This piper will then launch a python PBJ bit of the entire pipeline.
 #
 set SetJavaHome=no
@@ -28,15 +25,6 @@ set SetJavaHome=no
 //  Sets up required parameters, starts your Artemis Broker, pips the PBJ project.
 load PbjStarter
 
-//
-// Start another instance of cTAKES, running the pipeline in StartAllExample_end.piper
-// $OutputDirectory will substitute the value of this cTAKES pipeline's value for OutputDirectory.
-// $ArtemisBroker will substitute the value of this cTAKES pipeline's value for ArtemisBroker.
-//
-
-add CtakesRunner Pipeline="-p PbjThirdStep -o $OutputDirectory -a $ArtemisBroker"
-
-
 //
 // Start the python bit of the full pipeline.
 //
@@ -50,7 +38,7 @@ add PythonRunner Command="-m $PbjSecondStep JavaToPy PyToJava" LogFile=word_find
 
 
 //
-// The pipeline run by this instance of cTAKES.
+// The pipeline run by this instance of cTAKES.  It includes a pbj sender and receiver.
 //
 
 // Load a simple token processing pipeline from another pipeline file
@@ -58,4 +46,21 @@ load DefaultTokenizerPipeline
 
 // Send CAS to Artemis at the specified queue.  Send stop signal when processing has finished.
 add PbjJmsSender SendQueue=JavaToPy SendStop=yes
-//add PbjStompSender SendQueue=JavaToPy SendStop=yes
+
+// At this point the python process should handle the cas, before sending it "back".
+
+// Receive CAS from Artemis at the specified queue.
+add PbjReceiverAE ReceiveQueue=PyToJava
+
+// Save a nice table.
+add SemanticTableFileWriter SubDirectory=table
+
+// Save HTML.
+add pretty.html.HtmlTextWriter SubDirectory=html
+
+// Save marked text.
+add pretty.plaintext.PrettyTextWriterFit SubDirectory=text
+
+// Stop the Artemis Broker
+add ArtemisStopper
+
diff --git a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjReceiverAE.java b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjReceiverAE.java
new file mode 100644
index 0000000..c8f96ea
--- /dev/null
+++ b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjReceiverAE.java
@@ -0,0 +1,121 @@
+package org.apache.ctakes.pbj.ae;
+
+import org.apache.ctakes.core.pipeline.PipeBitInfo;
+import org.apache.ctakes.pbj.cr.PbjReceiver;
+import org.apache.log4j.Logger;
+import org.apache.uima.UimaContext;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.cas.impl.CASImpl;
+import org.apache.uima.collection.CollectionException;
+import org.apache.uima.fit.component.JCasAnnotator_ImplBase;
+import org.apache.uima.fit.descriptor.ConfigurationParameter;
+import org.apache.uima.jcas.JCas;
+import org.apache.uima.resource.ResourceInitializationException;
+
+import java.io.IOException;
+
+import static org.apache.ctakes.pbj.cr.PbjReceiver.*;
+import static org.apache.ctakes.pbj.util.PbjConstants.*;
+
+/**
+ * @author SPF , chip-nlp
+ * @since {5/16/2023}
+ */
+@PipeBitInfo(
+      name = "PbjReceiverAE",
+      description = "Annotation Engine wrapper for the PbjReceiver.",
+      role = PipeBitInfo.Role.ANNOTATOR
+)
+public class PbjReceiverAE extends JCasAnnotator_ImplBase {
+
+   static private final Logger LOGGER = Logger.getLogger( "PbjReceiverAE" );
+
+   // Duplicates of all the PbjReceiver configuration parameters.
+
+   @ConfigurationParameter(
+         name = PARAM_RECEIVER_NAME,
+         description = DESC_RECEIVER_NAME,
+         mandatory = false,
+         defaultValue = DEFAULT_USER
+   )
+   private String _userName;
+
+   @ConfigurationParameter(
+         name = PARAM_RECEIVER_PASS,
+         description = DESC_RECEIVER_PASS,
+         mandatory = false,
+         defaultValue = DEFAULT_PASS
+   )
+   private String _password;
+
+
+   @ConfigurationParameter(
+         name = PARAM_HOST,
+         description = DESC_HOST,
+         mandatory = false,
+         defaultValue = DEFAULT_HOST
+   )
+   private String _host;
+
+   @ConfigurationParameter(
+         name = PARAM_PORT,
+         description = DESC_PORT,
+         mandatory = false
+   )
+   private int _port = DEFAULT_PORT;
+
+   @ConfigurationParameter(
+         name = PARAM_QUEUE,
+         description = DESC_QUEUE
+   )
+   private String _queue;
+
+   @ConfigurationParameter(
+         name = PARAM_ACCEPT_STOP,
+         description = DESC_ACCEPT_STOP,
+         mandatory = false,
+         defaultValue = DEFAULT_ACCEPT_STOP
+   )
+   private String _acceptStop;
+
+
+   private PbjReceiver _delegate;
+
+
+   /**
+    * {@inheritDoc}
+    */
+   @Override
+   public void initialize( final UimaContext context ) throws ResourceInitializationException {
+      super.initialize( context );
+      _delegate = new PbjReceiver();
+      // ConfigParam initialization for CollectionReaders is done in initialize(), without a UimaContext.
+      // That is unfortunate for us, but we can set all the parameter values explicitly.
+      _delegate.setUserName( _userName );
+      _delegate.setPassword( _password );
+      _delegate.setHost( _host );
+      _delegate.setPort( _port );
+      _delegate.setQueue( _queue );
+      _delegate.setAcceptStop( _acceptStop );
+      _delegate.initialize( context );
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   @Override
+   public void process( final JCas jcas ) throws AnalysisEngineProcessException {
+      // From https://issues.apache.org/jira/browse/UIMA-1718
+      ((CASImpl)jcas.getCas()).restoreClassLoaderUnlockCas();
+      jcas.reset();
+      try {
+         if ( _delegate.hasNext() ) {
+            _delegate.getNext( jcas );
+         }
+      } catch ( CollectionException | IOException cE ) {
+         throw new AnalysisEngineProcessException( cE );
+      }
+   }
+
+
+}
diff --git a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java
index 5e880a1..e974107 100644
--- a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java
+++ b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java
@@ -35,9 +35,8 @@ import static org.apache.ctakes.pbj.util.PbjConstants.*;
       role = PipeBitInfo.Role.SPECIAL
 )
 
-public class PbjReceiver extends JCasCollectionReader_ImplBase {
+final public class PbjReceiver extends JCasCollectionReader_ImplBase {
 
-   // to add a configuration parameter, type "param" and hit tab.
    static public final String PARAM_RECEIVER_NAME = "ReceiverName";
    static public final String PARAM_RECEIVER_PASS = "ReceiverPass";
    static public final String PARAM_HOST = "ReceiveHost";
@@ -108,6 +107,31 @@ public class PbjReceiver extends JCasCollectionReader_ImplBase {
    private String _messageText = "";
 
 
+   public void setUserName( final String userName ) {
+      _userName = userName;
+   }
+
+   public void setPassword( final String password ) {
+      _password = password;
+   }
+
+   public void setHost( final String host ) {
+      _host  = host;
+   }
+
+   public void setPort( final int port ) {
+      _port = port;
+   }
+
+   public void setQueue( final String queue ) {
+      _queue = queue;
+   }
+
+   public void setAcceptStop( final String acceptStop ) {
+      _acceptStop = acceptStop;
+   }
+
+
    /**
     * Creates and starts ActiveMQ connection which uses the configuration provided by user.
     * {@inheritDoc}
@@ -220,7 +244,7 @@ public class PbjReceiver extends JCasCollectionReader_ImplBase {
       try {
          _connection.stop();
          _connection.close();
-         LOGGER.info( "Disconnected PBJ Sender on " + _host + " " + _queue + " ..." );
+         LOGGER.info( "Disconnected PBJ Receiver on " + _host + " " + _queue + " ..." );
       } catch ( JMSException jmsE ) {
          if ( jmsE.getMessage().equalsIgnoreCase( "Connection is closed" ) ) {
             return;