You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ctakes.apache.org by se...@apache.org on 2023/05/16 16:31:24 UTC
[ctakes] branch main updated: Annotation Engine wrapper for the PbjReceiver collection reader. Example piper that has all of the ctakes steps in 1 pipeline
This is an automated email from the ASF dual-hosted git repository.
seanfinan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ctakes.git
The following commit(s) were added to refs/heads/main by this push:
new d0df59d Annotation Engine wrapper for the PbjReceiver collection reader. Example piper that has all of the ctakes steps in 1 pipeline
d0df59d is described below
commit d0df59dd342bca1be6aa465f6cea96604ce770c1
Author: Sean Finan <se...@childrens.harvard.edu>
AuthorDate: Tue May 16 12:31:10 2023 -0400
Annotation Engine wrapper for the PbjReceiver collection reader.
Example piper that has all of the ctakes steps in 1 pipeline
---
.../ctakes/examples/pipeline/WordFinder.piper | 2 +-
.../{WordFinder.piper => WordFinderInOne.piper} | 33 +++---
.../org/apache/ctakes/pbj/ae/PbjReceiverAE.java | 121 +++++++++++++++++++++
.../java/org/apache/ctakes/pbj/cr/PbjReceiver.java | 30 ++++-
4 files changed, 168 insertions(+), 18 deletions(-)
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
index d6e4a3d..b6ec6ac 100644
--- a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
@@ -58,4 +58,4 @@ load DefaultTokenizerPipeline
// Send CAS to Artemis at the specified queue. Send stop signal when processing has finished.
add PbjJmsSender SendQueue=JavaToPy SendStop=yes
-//add PbjStompSender SendQueue=JavaToPy SendStop=yes
+
diff --git a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinderInOne.piper
similarity index 71%
copy from ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
copy to ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinderInOne.piper
index d6e4a3d..6539b03 100644
--- a/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinder.piper
+++ b/ctakes-examples/src/user/resources/org/apache/ctakes/examples/pipeline/WordFinderInOne.piper
@@ -4,9 +4,6 @@
# This piper will start the Apache Artemis broker pointed to by the -a parameter on the command line.
# It will pause for 5 seconds to allow Artemis to fully launch.
#
-# This piper will then launch another instance of Apache cTAKES.
-# That instance of cTAKES will run the third and final bit of the entire PBJ pipeline.
-#
# This piper will then launch a python PBJ bit of the entire pipeline.
#
set SetJavaHome=no
@@ -28,15 +25,6 @@ set SetJavaHome=no
// Sets up required parameters, starts your Artemis Broker, pips the PBJ project.
load PbjStarter
-//
-// Start another instance of cTAKES, running the pipeline in StartAllExample_end.piper
-// $OutputDirectory will substitute the value of this cTAKES pipeline's value for OutputDirectory.
-// $ArtemisBroker will substitute the value of this cTAKES pipeline's value for ArtemisBroker.
-//
-
-add CtakesRunner Pipeline="-p PbjThirdStep -o $OutputDirectory -a $ArtemisBroker"
-
-
//
// Start the python bit of the full pipeline.
//
@@ -50,7 +38,7 @@ add PythonRunner Command="-m $PbjSecondStep JavaToPy PyToJava" LogFile=word_find
//
-// The pipeline run by this instance of cTAKES.
+// The pipeline run by this instance of cTAKES. It includes a pbj sender and receiver.
//
// Load a simple token processing pipeline from another pipeline file
@@ -58,4 +46,21 @@ load DefaultTokenizerPipeline
// Send CAS to Artemis at the specified queue. Send stop signal when processing has finished.
add PbjJmsSender SendQueue=JavaToPy SendStop=yes
-//add PbjStompSender SendQueue=JavaToPy SendStop=yes
+
+// At this point the python process should handle the cas, before sending it "back".
+
+// Receive CAS from Artemis at the specified queue.
+add PbjReceiverAE ReceiveQueue=PyToJava
+
+// Save a nice table.
+add SemanticTableFileWriter SubDirectory=table
+
+// Save HTML.
+add pretty.html.HtmlTextWriter SubDirectory=html
+
+// Save marked text.
+add pretty.plaintext.PrettyTextWriterFit SubDirectory=text
+
+// Stop the Artemis Broker
+add ArtemisStopper
+
diff --git a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjReceiverAE.java b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjReceiverAE.java
new file mode 100644
index 0000000..c8f96ea
--- /dev/null
+++ b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/ae/PbjReceiverAE.java
@@ -0,0 +1,121 @@
+package org.apache.ctakes.pbj.ae;
+
+import org.apache.ctakes.core.pipeline.PipeBitInfo;
+import org.apache.ctakes.pbj.cr.PbjReceiver;
+import org.apache.log4j.Logger;
+import org.apache.uima.UimaContext;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.cas.impl.CASImpl;
+import org.apache.uima.collection.CollectionException;
+import org.apache.uima.fit.component.JCasAnnotator_ImplBase;
+import org.apache.uima.fit.descriptor.ConfigurationParameter;
+import org.apache.uima.jcas.JCas;
+import org.apache.uima.resource.ResourceInitializationException;
+
+import java.io.IOException;
+
+import static org.apache.ctakes.pbj.cr.PbjReceiver.*;
+import static org.apache.ctakes.pbj.util.PbjConstants.*;
+
+/**
+ * @author SPF , chip-nlp
+ * @since {5/16/2023}
+ */
+@PipeBitInfo(
+ name = "PbjReceiverAE",
+ description = "Annotation Engine wrapper for the PbjReceiver.",
+ role = PipeBitInfo.Role.ANNOTATOR
+)
+public class PbjReceiverAE extends JCasAnnotator_ImplBase {
+
+ static private final Logger LOGGER = Logger.getLogger( "PbjReceiverAE" );
+
+ // Duplicates of all the PbjReceiver configuration parameters.
+
+ @ConfigurationParameter(
+ name = PARAM_RECEIVER_NAME,
+ description = DESC_RECEIVER_NAME,
+ mandatory = false,
+ defaultValue = DEFAULT_USER
+ )
+ private String _userName;
+
+ @ConfigurationParameter(
+ name = PARAM_RECEIVER_PASS,
+ description = DESC_RECEIVER_PASS,
+ mandatory = false,
+ defaultValue = DEFAULT_PASS
+ )
+ private String _password;
+
+
+ @ConfigurationParameter(
+ name = PARAM_HOST,
+ description = DESC_HOST,
+ mandatory = false,
+ defaultValue = DEFAULT_HOST
+ )
+ private String _host;
+
+ @ConfigurationParameter(
+ name = PARAM_PORT,
+ description = DESC_PORT,
+ mandatory = false
+ )
+ private int _port = DEFAULT_PORT;
+
+ @ConfigurationParameter(
+ name = PARAM_QUEUE,
+ description = DESC_QUEUE
+ )
+ private String _queue;
+
+ @ConfigurationParameter(
+ name = PARAM_ACCEPT_STOP,
+ description = DESC_ACCEPT_STOP,
+ mandatory = false,
+ defaultValue = DEFAULT_ACCEPT_STOP
+ )
+ private String _acceptStop;
+
+
+ private PbjReceiver _delegate;
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void initialize( final UimaContext context ) throws ResourceInitializationException {
+ super.initialize( context );
+ _delegate = new PbjReceiver();
+ // ConfigParam initialization for CollectionReaders is done in initialize(), without a UimaContext.
+ // That is unfortunate for us, but we can set all the parameter values explicitly.
+ _delegate.setUserName( _userName );
+ _delegate.setPassword( _password );
+ _delegate.setHost( _host );
+ _delegate.setPort( _port );
+ _delegate.setQueue( _queue );
+ _delegate.setAcceptStop( _acceptStop );
+ _delegate.initialize( context );
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void process( final JCas jcas ) throws AnalysisEngineProcessException {
+ // From https://issues.apache.org/jira/browse/UIMA-1718
+ ((CASImpl)jcas.getCas()).restoreClassLoaderUnlockCas();
+ jcas.reset();
+ try {
+ if ( _delegate.hasNext() ) {
+ _delegate.getNext( jcas );
+ }
+ } catch ( CollectionException | IOException cE ) {
+ throw new AnalysisEngineProcessException( cE );
+ }
+ }
+
+
+}
diff --git a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java
index 5e880a1..e974107 100644
--- a/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java
+++ b/ctakes-pbj/src/main/java/org/apache/ctakes/pbj/cr/PbjReceiver.java
@@ -35,9 +35,8 @@ import static org.apache.ctakes.pbj.util.PbjConstants.*;
role = PipeBitInfo.Role.SPECIAL
)
-public class PbjReceiver extends JCasCollectionReader_ImplBase {
+final public class PbjReceiver extends JCasCollectionReader_ImplBase {
- // to add a configuration parameter, type "param" and hit tab.
static public final String PARAM_RECEIVER_NAME = "ReceiverName";
static public final String PARAM_RECEIVER_PASS = "ReceiverPass";
static public final String PARAM_HOST = "ReceiveHost";
@@ -108,6 +107,31 @@ public class PbjReceiver extends JCasCollectionReader_ImplBase {
private String _messageText = "";
+ public void setUserName( final String userName ) {
+ _userName = userName;
+ }
+
+ public void setPassword( final String password ) {
+ _password = password;
+ }
+
+ public void setHost( final String host ) {
+ _host = host;
+ }
+
+ public void setPort( final int port ) {
+ _port = port;
+ }
+
+ public void setQueue( final String queue ) {
+ _queue = queue;
+ }
+
+ public void setAcceptStop( final String acceptStop ) {
+ _acceptStop = acceptStop;
+ }
+
+
/**
* Creates and starts ActiveMQ connection which uses the configuration provided by user.
* {@inheritDoc}
@@ -220,7 +244,7 @@ public class PbjReceiver extends JCasCollectionReader_ImplBase {
try {
_connection.stop();
_connection.close();
- LOGGER.info( "Disconnected PBJ Sender on " + _host + " " + _queue + " ..." );
+ LOGGER.info( "Disconnected PBJ Receiver on " + _host + " " + _queue + " ..." );
} catch ( JMSException jmsE ) {
if ( jmsE.getMessage().equalsIgnoreCase( "Connection is closed" ) ) {
return;