You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by re...@apache.org on 2020/03/24 09:54:40 UTC
[uima-uimafit] 01/01: [UIMA-6207] Allow setting number of threads
in CpePipeline
This is an automated email from the ASF dual-hosted git repository.
rec pushed a commit to branch feature/UIMA-6207-Allow-setting-number-of-threads-in-CpePipeline
in repository https://gitbox.apache.org/repos/asf/uima-uimafit.git
commit 36ea029533a5258e8b6cb8762a9f6bbe68361d78
Author: Richard Eckart de Castilho <re...@apache.org>
AuthorDate: Tue Mar 24 10:50:21 2020 +0100
[UIMA-6207] Allow setting number of threads in CpePipeline
- Added parameter/signature to allow specifying the number of threads
---
.../java/org/apache/uima/fit/cpe/CpePipeline.java | 44 ++++++++++++++++++++--
1 file changed, 41 insertions(+), 3 deletions(-)
diff --git a/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java b/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java
index 4d81500..056cee2 100644
--- a/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java
+++ b/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java
@@ -18,6 +18,7 @@
*/
package org.apache.uima.fit.cpe;
+import static java.lang.Runtime.getRuntime;
import static org.apache.uima.fit.factory.AnalysisEngineFactory.createEngineDescription;
import java.io.IOException;
@@ -43,8 +44,38 @@ public final class CpePipeline {
}
/**
+ * Run the CollectionReader and AnalysisEngines as a multi-threaded pipeline. This call uses
+ * a number of threads equal to the number of available processors (as reported by Java, so
+ * usually boiling down to cores) minus 1 - minimum of 1.
+ *
+ * @param readerDesc
+ * The CollectionReader that loads the documents into the CAS.
+ * @param descs
+ * Primitive AnalysisEngineDescriptions that process the CAS, in order. If you have a mix
+ * of primitive and aggregate engines, then please create the AnalysisEngines yourself
+ * and call the other runPipeline method.
+ * @throws SAXException
+ * if there was a XML-related problem materializing the component descriptors that are
+ * referenced from the CPE descriptor
+ * @throws IOException
+ * if there was a I/O-related problem materializing the component descriptors that are
+ * referenced from the CPE descriptor
+ * @throws CpeDescriptorException
+ * if there was a problem configuring the CPE descriptor
+ * @throws UIMAException
+ * if there was a problem initializing or running the CPE.
+ */ public static void runPipeline(final CollectionReaderDescription readerDesc,
+ final AnalysisEngineDescription... descs)
+ throws UIMAException, SAXException, CpeDescriptorException, IOException {
+
+ runPipeline(Math.max(1, getRuntime().availableProcessors() - 1), readerDesc, descs);
+ }
+
+ /**
* Run the CollectionReader and AnalysisEngines as a multi-threaded pipeline.
*
+ * @param parallelism
+ * Number of threads to use when running the analysis engines in the CPE.
* @param readerDesc
* The CollectionReader that loads the documents into the CAS.
* @param descs
@@ -62,9 +93,9 @@ public final class CpePipeline {
* @throws UIMAException
* if there was a problem initializing or running the CPE.
*/
- public static void runPipeline(final CollectionReaderDescription readerDesc,
- final AnalysisEngineDescription... descs) throws UIMAException, SAXException,
- CpeDescriptorException, IOException {
+ public static void runPipeline(final int parallelism,
+ final CollectionReaderDescription readerDesc, final AnalysisEngineDescription... descs)
+ throws UIMAException, SAXException, CpeDescriptorException, IOException {
// Create AAE
final AnalysisEngineDescription aaeDesc = createEngineDescription(descs);
@@ -98,6 +129,7 @@ public final class CpePipeline {
private boolean isProcessing = true;
+ @Override
public void entityProcessComplete(CAS arg0, EntityProcessStatus arg1) {
if (arg1.isException()) {
for (Exception e : arg1.getExceptions()) {
@@ -106,6 +138,7 @@ public final class CpePipeline {
}
}
+ @Override
public void aborted() {
synchronized (this) {
if (isProcessing) {
@@ -115,10 +148,12 @@ public final class CpePipeline {
}
}
+ @Override
public void batchProcessComplete() {
// Do nothing
}
+ @Override
public void collectionProcessComplete() {
synchronized (this) {
if (isProcessing) {
@@ -128,14 +163,17 @@ public final class CpePipeline {
}
}
+ @Override
public void initializationComplete() {
// Do nothing
}
+ @Override
public void paused() {
// Do nothing
}
+ @Override
public void resumed() {
// Do nothing
}