You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by re...@apache.org on 2022/01/19 12:38:11 UTC
[uima-uimaj] 01/01: [UIMA-6412] Stop using ThreadGroup in CPMEngine
This is an automated email from the ASF dual-hosted git repository.
rec pushed a commit to branch feature/UIMA-6412-Stop-using-ThreadGroup-in-CPMEngine
in repository https://gitbox.apache.org/repos/asf/uima-uimaj.git
commit ac4221f5d7bd33704896aeffd1f772740926de81
Author: Richard Eckart de Castilho <re...@apache.org>
AuthorDate: Wed Jan 19 13:38:04 2022 +0100
[UIMA-6412] Stop using ThreadGroup in CPMEngine
- Refactor the CPMThreadGroup into a CPMExecutionService
- Replace Thread.join() calls with Future.get() calls
- Replace the ThreadGroupDestroyer thread a simple call to shutdown()
- Replace the uncaught exception handler with overriding ThreadPoolExecutor.afterExecute()
- Clean up and format few classes
---
.../uima/collection/impl/cpm/BaseCPMImpl.java | 375 ++++++++++++---------
.../uima/collection/impl/cpm/Checkpoint.java | 7 -
.../uima/collection/impl/cpm/engine/CPMEngine.java | 222 ++++++------
...CPMThreadGroup.java => CPMExecutorService.java} | 99 +++---
.../impl/cpm/engine/DebugControlThread.java | 32 +-
5 files changed, 401 insertions(+), 334 deletions(-)
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
index 972e7ee..05fa071 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
@@ -21,13 +21,13 @@ package org.apache.uima.collection.impl.cpm;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UIMARuntimeException;
@@ -44,7 +44,7 @@ import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer;
import org.apache.uima.collection.impl.cpm.container.CPEFactory;
import org.apache.uima.collection.impl.cpm.container.deployer.socket.ProcessControllerAdapter;
import org.apache.uima.collection.impl.cpm.engine.CPMEngine;
-import org.apache.uima.collection.impl.cpm.engine.CPMThreadGroup;
+import org.apache.uima.collection.impl.cpm.engine.CPMExecutorService;
import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
import org.apache.uima.collection.impl.cpm.utils.CasMetaData;
import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
@@ -62,15 +62,12 @@ import org.apache.uima.util.Progress;
import org.apache.uima.util.UimaTimer;
import org.apache.uima.util.impl.ProcessTrace_impl;
-
/**
* Main thread that launches CPE and manages it. An application interacts with the running CPE via
* this object. Through an API, an application may start, pause, resume, and stop a CPE.
- *
- *
*/
public class BaseCPMImpl implements BaseCPM, Runnable {
-
+
/** The default process trace. */
private boolean defaultProcessTrace;
@@ -107,55 +104,60 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/** The m event type map. */
private Map mEventTypeMap;
- /** The cpm thread group. */
- public CPMThreadGroup cpmThreadGroup = null;
+ /** The CPE executor service. */
+ public CPMExecutorService cpmExecutorService = null;
/**
* Instantiates and initializes CPE Factory with a given CPE Descriptor and defaults.
*
- * @param aDescriptor -
- * parsed CPE descriptor
- * @throws Exception -
+ * @param aDescriptor
+ * - parsed CPE descriptor
+ * @throws Exception
+ * -
*/
public BaseCPMImpl(CpeDescription aDescriptor) throws Exception {
this(aDescriptor, null, true, UIMAFramework.getDefaultPerformanceTuningProperties());
- cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+ cpmExecutorService = new CPMExecutorService();
}
/**
* Instantiates and initializes CPE Factory responsible for creating individual components that
* are part of the processing pipeline.
*
- * @param aDescriptor -
- * parsed CPE descriptor
- * @param aResourceManager -
- * ResourceManager instance to be used by the CPE
- * @param aDefaultProcessTrace -
- * ProcessTrace instance to capture events and stats
- * @param aProps the a props
- * @throws Exception -
+ * @param aDescriptor
+ * - parsed CPE descriptor
+ * @param aResourceManager
+ * - ResourceManager instance to be used by the CPE
+ * @param aDefaultProcessTrace
+ * - ProcessTrace instance to capture events and stats
+ * @param aProps
+ * the a props
+ * @throws Exception
+ * -
*/
public BaseCPMImpl(CpeDescription aDescriptor, ResourceManager aResourceManager,
boolean aDefaultProcessTrace, Properties aProps) throws Exception {
cpeFactory = new CPEFactory(aDescriptor, aResourceManager);
defaultProcessTrace = aDefaultProcessTrace;
- cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+ cpmExecutorService = new CPMExecutorService();
init(false, aProps);
}
/**
* Parses CPE descriptor.
*
- * @param mode -
- * indicates if the CPM should use a static descriptor or one provided
- * @param aDescriptor -
- * provided descriptor path
- * @param aResourceManager ResourceManager to be used by CPM
- * @throws Exception -
+ * @param mode
+ * - indicates if the CPM should use a static descriptor or one provided
+ * @param aDescriptor
+ * - provided descriptor path
+ * @param aResourceManager
+ * ResourceManager to be used by CPM
+ * @throws Exception
+ * -
*/
public BaseCPMImpl(Boolean mode, String aDescriptor, ResourceManager aResourceManager)
throws Exception {
- cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+ cpmExecutorService = new CPMExecutorService();
cpeFactory = new CPEFactory(aResourceManager);
if (mode == null) {
defaultProcessTrace = true;
@@ -170,7 +172,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Plugs in custom perfomance tunning parameters.
*
- * @param aPerformanceTuningSettings the new performance tuning settings
+ * @param aPerformanceTuningSettings
+ * the new performance tuning settings
*/
public void setPerformanceTuningSettings(Properties aPerformanceTuningSettings) {
cpEngine.setPerformanceTuningSettings(aPerformanceTuningSettings);
@@ -180,8 +183,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
* Plugs in a given {@link ProcessControllerAdapter}. The CPM uses this adapter to request Cas
* Processor restarts and shutdown.
*
- * @param aPca -
- * instance of the ProcessControllerAdapter
+ * @param aPca
+ * - instance of the ProcessControllerAdapter
*/
public void setProcessControllerAdapter(ProcessControllerAdapter aPca) {
cpEngine.setProcessControllerAdapter(aPca);
@@ -192,7 +195,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
* use at the end of processing. Jedii-style reporting shows a summary for this run. The CPM
* default report shows more detail information.
*
- * @param aUseJediiReport the new jedii report
+ * @param aUseJediiReport
+ * the new jedii report
*/
public void setJediiReport(boolean aUseJediiReport) {
mEventTypeMap = new HashMap();
@@ -205,9 +209,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Instantiates and initializes a CPE.
*
- * @param aDummyCasProcessor -
- * @param aProps the a props
- * @throws Exception -
+ * @param aDummyCasProcessor
+ * -
+ * @param aProps
+ * the a props
+ * @throws Exception
+ * -
*/
public void init(boolean aDummyCasProcessor, Properties aProps) throws Exception {
String uimaTimerClass = cpeFactory.getCPEConfig().getTimerImpl();
@@ -239,8 +246,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
if (checkpointFileName != null && checkpointFileName.trim().length() > 0) {
File checkpointFile = new File(checkpointFileName);
- checkpoint = new Checkpoint(this, checkpointFileName, cpeFactory.getCPEConfig()
- .getCheckpoint().getFrequency());
+ checkpoint = new Checkpoint(this, checkpointFileName,
+ cpeFactory.getCPEConfig().getCheckpoint().getFrequency());
// Check if the checkpoint file already exists. If it does, the CPM did not complete
// successfully during the previous run and CPM will start in recovery mode, restoring all
// totals and status's from the recovered checkpoint. The processing pipeline state will
@@ -259,7 +266,7 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
}
// Instantiate class responsible for processing
- cpEngine = new CPMEngine(cpmThreadGroup, cpeFactory, procTr, checkpointData);
+ cpEngine = new CPMEngine(cpmExecutorService, cpeFactory, procTr, checkpointData);
if (!aDummyCasProcessor) {
int concurrentThreadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors()
.getConcurrentPUCount();
@@ -267,12 +274,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
CasProcessor[] casProcessors = cpeFactory.getCasProcessors();
for (int i = 0; i < casProcessors.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
- UIMAFramework.getLogger(this.getClass()).logrb(
- Level.CONFIG,
- this.getClass().getName(),
- "process",
- CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
- "UIMA_CPM_add_cp__CONFIG",
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
+ "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_add_cp__CONFIG",
new Object[] { Thread.currentThread().getName(),
casProcessors[i].getProcessingResourceMetaData().getName() });
}
@@ -299,8 +302,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
cpEngine.setInputQueueSize(casPoolSize == 0 ? iqSize : casPoolSize);
} catch (NumberFormatException e) {
throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
- "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] {
- Thread.currentThread().getName(), "inputQueueSize" }));
+ "UIMA_CPM_EXP_queue_size_not_defined__WARNING",
+ new Object[] { Thread.currentThread().getName(), "inputQueueSize" }));
}
try {
int oqSize = 0;
@@ -310,16 +313,17 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
cpEngine.setOutputQueueSize(casPoolSize == 0 ? oqSize : casPoolSize + 2);
} catch (NumberFormatException e) {
throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
- "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] {
- Thread.currentThread().getName(), "outputQueueSize" }));
+ "UIMA_CPM_EXP_queue_size_not_defined__WARNING",
+ new Object[] { Thread.currentThread().getName(), "outputQueueSize" }));
}
try {
int threadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getConcurrentPUCount();
cpEngine.setConcurrentThreadSize(threadCount);
} catch (NumberFormatException e) {
throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
- "UIMA_CPM_EXP_invalid_component_reference__WARNING", new Object[] {
- Thread.currentThread().getName(), "casProcessors", "processingUnitThreadCount" }));
+ "UIMA_CPM_EXP_invalid_component_reference__WARNING",
+ new Object[] { Thread.currentThread().getName(), "casProcessors",
+ "processingUnitThreadCount" }));
}
}
@@ -327,13 +331,16 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
* Returns {@link CPEConfig} object holding current CPE configuration.
*
* @return CPEConfig instance
- * @throws Exception -
+ * @throws Exception
+ * -
*/
public CpeConfiguration getCPEConfig() throws Exception {
return cpeFactory.getCPEConfig();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#getCasProcessors()
*/
/*
@@ -347,29 +354,40 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
return casProcs == null ? new CasProcessor[0] : casProcs;
}
- /* (non-Javadoc)
- * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+ * .CasProcessor)
*/
/*
* Adds given CasProcessor to the processing pipeline. A new CasProcessor is appended to the
* current list.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+ * @see
+ * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+ * .CasProcessor)
*/
@Override
public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException {
cpEngine.addCasProcessor(aCasProcessor);
}
- /* (non-Javadoc)
- * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor, int)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+ * .CasProcessor, int)
*/
/*
* Adds given CasProcessor to the processing pipeline. A new CasProcessor is inserted into a given
* spot in the current list.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor,
- * int)
+ * @see
+ * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+ * .CasProcessor, int)
*/
@Override
public void addCasProcessor(CasProcessor aCasProcessor, int aIndex)
@@ -377,26 +395,34 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
cpEngine.addCasProcessor(aCasProcessor, aIndex);
}
- /* (non-Javadoc)
- * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.
+ * base_cpm.CasProcessor)
*/
/*
* Removes given CasProcessor from the processing pipeline.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+ * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.
+ * base_cpm.CasProcessor)
*/
@Override
public void removeCasProcessor(CasProcessor aCasProcessor) {
cpEngine.removeCasProcessor(0);
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(java.lang.String)
*/
/*
* Disables given CasProcessor in the existing processing pipeline.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+ * @see
+ * org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.
+ * base_cpm.CasProcessor)
*/
@Override
public void disableCasProcessor(String aCasProcessorName) {
@@ -407,19 +433,24 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Enable cas processor.
*
- * @param aCasProcessorName the a cas processor name
+ * @param aCasProcessorName
+ * the a cas processor name
*/
/*
* Disables given CasProcessor in the existing processing pipeline.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+ * @see
+ * org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.
+ * base_cpm.CasProcessor)
*/
public void enableCasProcessor(String aCasProcessorName) {
cpEngine.enableCasProcessor(aCasProcessorName);
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#isSerialProcessingRequired()
*/
/*
@@ -440,7 +471,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
public void setSerialProcessingRequired(boolean aRequired) {
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#isPauseOnException()
*/
/*
@@ -453,7 +486,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
return cpEngine.isPauseOnException();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#setPauseOnException(boolean)
*/
/*
@@ -466,34 +501,44 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
cpEngine.setPauseOnException(aPause);
}
- /* (non-Javadoc)
- * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.
+ * collection.base_cpm.BaseStatusCallbackListener)
*/
/*
* Adds Event Listener. Important events like, end of entity processing, exceptions, etc will be
* sent to the registered listeners.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+ * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.
+ * collection.base_cpm.BaseStatusCallbackListener)
*/
@Override
public void addStatusCallbackListener(BaseStatusCallbackListener aListener) {
cpEngine.addStatusCallbackListener(aListener);
}
- /* (non-Javadoc)
- * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.
+ * collection.base_cpm.BaseStatusCallbackListener)
*/
/*
* Remoces named listener from the listener list.
*
- * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+ * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.
+ * collection.base_cpm.BaseStatusCallbackListener)
*/
@Override
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener) {
cpEngine.removeStatusCallbackListener(aListener);
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see java.lang.Runnable#run()
*/
/*
@@ -523,11 +568,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
// Start the checkpoint thread
if (checkpoint != null) {
- new Thread(checkpoint).start();
+ cpmExecutorService.submit(checkpoint);
}
- cpEngine.start();
- // Joing the CPMWorker Thread and wait until it finishes
- cpEngine.join();
+
+ Future<?> cpEngineResult = cpmExecutorService.submit(cpEngine);
+ // Joining the CPMWorker Thread and wait until it finishes
+ cpEngineResult.get();
completed = true;
// If the entire collection has been processed there is no need for a checkpoint.
@@ -569,14 +615,13 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "" + e);
killed = true;
- ArrayList statusCbL = cpEngine.getCallbackListeners();
+ List<BaseStatusCallbackListener> statusCbL = cpEngine.getCallbackListeners();
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr, true);
// e is the actual exception.
enProcSt.addEventStatus("CPM", "Failed", e);
// Notify all listeners that the CPM has finished processing
- for (int j = 0; j < statusCbL.size(); j++) {
- BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
+ for (BaseStatusCallbackListener st : statusCbL) {
if (st != null && st instanceof StatusCallbackListener) {
((StatusCallbackListener) st).entityProcessComplete(null, enProcSt);
}
@@ -592,11 +637,11 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_stopped__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
}
- ArrayList statusCbL = cpEngine.getCallbackListeners();
+
+ List<BaseStatusCallbackListener> statusCbL = cpEngine.getCallbackListeners();
// Notify all listeners that the CPM has finished processing
- for (int j = 0; j < statusCbL.size(); j++) {
- BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
- if ( st != null ) {
+ for (BaseStatusCallbackListener st : statusCbL) {
+ if (st != null) {
if (!killed) {
st.collectionProcessComplete();
} else {
@@ -619,10 +664,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
* This method is called by an applications to begin CPM processing with a given Collection. It
* just creates a new thread and starts it.
*
- * @param aCollectionReader the a collection reader
- * @throws ResourceInitializationException the resource initialization exception
+ * @param aCollectionReader
+ * the a collection reader
+ * @throws ResourceInitializationException
+ * the resource initialization exception
* @see org.apache.uima.collection.base_cpm.BaseCPM#process()
- * @deprecated
+ * @deprecated
*/
@Deprecated
public void process(BaseCollectionReader aCollectionReader)
@@ -637,9 +684,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
if (cpeFactory.isDefault()) {
cpeFactory.addCollectionReader(collectionReader);
}
- cpmThreadGroup.setProcessTrace(procTr);
- cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
- new Thread(this).start();
+ cpmExecutorService.setProcessTrace(procTr);
+ cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
+ cpmExecutorService.submit(this);
}
/**
@@ -664,21 +711,24 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
if (cpeFactory.isDefault()) {
cpeFactory.addCollectionReader(collectionReader);
}
- cpmThreadGroup.setProcessTrace(procTr);
- cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
+ cpmExecutorService.setProcessTrace(procTr);
+ cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
- new Thread(this).start();
+ cpmExecutorService.submit(this);
}
/**
* This method is called by an applications to begin CPM processing with a given Collection. It
* just creates a new thread and starts it.
*
- * @param aCollectionReader the a collection reader
- * @param aBatchSize the a batch size
- * @throws ResourceInitializationException the resource initialization exception
+ * @param aCollectionReader
+ * the a collection reader
+ * @param aBatchSize
+ * the a batch size
+ * @throws ResourceInitializationException
+ * the resource initialization exception
* @see org.apache.uima.collection.base_cpm.BaseCPM#process()
- * @deprecated
+ * @deprecated
*/
@Deprecated
public void process(BaseCollectionReader aCollectionReader, int aBatchSize)
@@ -689,10 +739,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
if (cpeFactory.isDefault()) {
cpeFactory.addCollectionReader(collectionReader);
}
- cpmThreadGroup.setProcessTrace(procTr);
- cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
-
- new Thread(cpmThreadGroup, this).start();
+ cpmExecutorService.setProcessTrace(procTr);
+ cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
+ cpmExecutorService.submit(this);
}
/**
@@ -726,7 +775,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#isProcessing()
*/
/*
@@ -740,7 +791,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
return cpEngine.isRunning();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#pause()
*/
/*
@@ -757,7 +810,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#isPaused()
*/
/*
@@ -770,7 +825,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
return cpEngine.isPaused();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#resume(boolean)
*/
/*
@@ -783,7 +840,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
resume();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#resume()
*/
/*
@@ -825,7 +884,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.uima.collection.base_cpm.BaseCPM#stop()
*/
/*
@@ -888,7 +949,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Decode status.
*
- * @param aStatus the a status
+ * @param aStatus
+ * the a status
* @return the string
*/
/*
@@ -920,10 +982,14 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Copy component events.
*
- * @param aEvType the a ev type
- * @param aList the a list
- * @param aPTr the a P tr
- * @throws IOException Signals that an I/O exception has occurred.
+ * @param aEvType
+ * the a ev type
+ * @param aList
+ * the a list
+ * @param aPTr
+ * the a P tr
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
*/
/*
* Copies events of a given type found in the list to a provided ProcessTrace instance
@@ -946,10 +1012,10 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Helper method to display stats and totals.
*
- * @param aProcessTrace -
- * trace containing stats
- * @param aNumDocsProcessed -
- * number of entities processed so far
+ * @param aProcessTrace
+ * - trace containing stats
+ * @param aNumDocsProcessed
+ * - number of entities processed so far
*/
public void displayStats(ProcessTrace aProcessTrace, int aNumDocsProcessed) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
@@ -965,10 +1031,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
// get the time.
if ("CPM".equals(event.getComponentName())) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(this.getClass()).log(
- Level.FINEST,
- "Current Component::" + event.getComponentName() + " Time::"
- + event.getDuration());
+ UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "Current Component::"
+ + event.getComponentName() + " Time::" + event.getDuration());
}
continue;
}
@@ -1000,8 +1064,10 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Helper method to help build the CPM report.
*
- * @param aEvent the a event
- * @param aTotalTime the a total time
+ * @param aEvent
+ * the a event
+ * @param aTotalTime
+ * the a total time
*/
public void buildEventTree(ProcessTraceEvent aEvent, int aTotalTime) {
// Skip reporting the CPM time.This time has already been acquired by summing up
@@ -1019,10 +1085,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
if (System.getProperty("DEBUG") != null) {
- UIMAFramework.getLogger(this.getClass()).log(
- Level.FINEST,
- "" + pct + "% (" + duration + "ms) - " + aEvent.getComponentName() + " (" + type
- + ")");
+ UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "" + pct + "% (" + duration
+ + "ms) - " + aEvent.getComponentName() + " (" + type + ")");
}
Iterator it = aEvent.getSubEvents().iterator();
while (it.hasNext()) {
@@ -1103,16 +1167,16 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
Long totalCollectionReaderTime = (Long) perfReport.get("COLLECTION_READER_TIME");
String readerName = collectionReader.getProcessingResourceMetaData().getName();
if (totalCollectionReaderTime != null) {
- processTrace.addEvent(readerName, "COLLECTION_READER_TIME", String
- .valueOf(totalCollectionReaderTime), 0, null);
+ processTrace.addEvent(readerName, "COLLECTION_READER_TIME",
+ String.valueOf(totalCollectionReaderTime), 0, null);
}
for (int i = 0; i < colReaderProgress.length; i++) {
if (Progress.BYTES.equals(colReaderProgress[i].getUnit())) {
- processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED, String
- .valueOf(colReaderProgress[i].getCompleted()), 0, null);
+ processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED,
+ String.valueOf(colReaderProgress[i].getCompleted()), 0, null);
} else if (Progress.ENTITIES.equals(colReaderProgress[i].getUnit())) {
- processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED, String
- .valueOf(colReaderProgress[i].getCompleted()), 0, null);
+ processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED,
+ String.valueOf(colReaderProgress[i].getCompleted()), 0, null);
}
}
@@ -1133,8 +1197,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
}
copyComponentEvents("Process", eList, processTrace);
- processTrace.addEvent(container.getName(), "Documents Processed", String.valueOf(container
- .getProcessed()), 0, null);
+ processTrace.addEvent(container.getName(), "Documents Processed",
+ String.valueOf(container.getProcessed()), 0, null);
String status = decodeStatus(container.getStatus());
processTrace.addEvent(container.getName(), "Processor Status", status, 0, null);
@@ -1147,20 +1211,20 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
0, null);
int restartCount = container.getRestartCount();
- processTrace.addEvent(container.getName(), "Processor Restarts", String
- .valueOf(restartCount), 0, null);
+ processTrace.addEvent(container.getName(), "Processor Restarts",
+ String.valueOf(restartCount), 0, null);
int retryCount = container.getRetryCount();
processTrace.addEvent(container.getName(), "Processor Retries", String.valueOf(retryCount),
0, null);
int filteredCount = container.getFilteredCount();
- processTrace.addEvent(container.getName(), "Filtered Entities", String
- .valueOf(filteredCount), 0, null);
+ processTrace.addEvent(container.getName(), "Filtered Entities",
+ String.valueOf(filteredCount), 0, null);
long remainingCount = container.getRemaining();
- processTrace.addEvent(container.getName(), "Processor Remaining", String
- .valueOf(remainingCount), 0, null);
+ processTrace.addEvent(container.getName(), "Processor Remaining",
+ String.valueOf(remainingCount), 0, null);
HashMap aMap = container.getAllStats();
@@ -1181,16 +1245,15 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
"Custom String Stat-" + key + " Value=" + (String) o);
}
} else if (o instanceof Integer) {
- processTrace.addEvent(container.getName(), key, String.valueOf(((Integer) o)
- .intValue()), 0, null);
+ processTrace.addEvent(container.getName(), key,
+ String.valueOf(((Integer) o).intValue()), 0, null);
if (System.getProperty("SHOW_CUSTOM_STATS") != null) {
UIMAFramework.getLogger(this.getClass()).log(Level.FINEST,
"Custom Integer Stat-" + key + " Value=" + o);
- }
+ }
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(this.getClass()).log(
- Level.FINEST,
+ UIMAFramework.getLogger(this.getClass()).log(Level.FINEST,
"Invalid Type Found When Generating Status For " + key + ". Type::"
+ o.getClass().getName()
+ " Not supported. Use Integer or String instead.");
@@ -1218,9 +1281,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Creates the default process trace.
*
- * @param aProcessors the a processors
- * @param srcProcTr the src proc tr
- * @param aProcessTrace the a process trace
+ * @param aProcessors
+ * the a processors
+ * @param srcProcTr
+ * the src proc tr
+ * @param aProcessTrace
+ * the a process trace
*/
private void createDefaultProcessTrace(CasProcessor[] aProcessors, ProcessTrace srcProcTr,
ProcessTrace aProcessTrace) {
@@ -1253,10 +1319,11 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
/**
* Returns a CPE descriptor as a String.
*
- * @param aList -
- * list of components
+ * @param aList
+ * - list of components
* @return - descriptor populated with a given components
- * @throws ResourceConfigurationException the resource configuration exception
+ * @throws ResourceConfigurationException
+ * the resource configuration exception
*/
public String getDescriptor(List aList) throws ResourceConfigurationException {
return cpeFactory.getDescriptor(aList);
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
index be1217b..f472b46 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
@@ -88,13 +88,6 @@ public class Checkpoint implements Runnable {
}
/**
- * Start the thread.
- */
- public void start() {
- new Thread(this).start();
- }
-
- /**
* Stops the checkpoint thread.
*/
public void stop() {
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
index 9a08962..afe544b 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
@@ -24,16 +24,14 @@ import java.lang.reflect.Constructor;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.vinci.util.Descriptor;
@@ -111,7 +109,9 @@ public class CPMEngine extends Thread {
/** The Constant SINGLE_THREADED_MODE. */
private static final String SINGLE_THREADED_MODE = "single-threaded";
- /** The cas pool. */
+ private final CPMExecutorService executorService;
+
+ /** The CAS pool. */
public CPECasPool casPool;
/** The lock for pause. */
@@ -215,6 +215,7 @@ public class CPMEngine extends Thread {
/** The producer. */
// work Queue
private ArtifactProducer producer = null;
+ private Future<?> producerResult;
/** The cpe factory. */
// Factory responsible for instantiating CPE components from CPE descriptor
@@ -223,12 +224,14 @@ public class CPMEngine extends Thread {
/** The processing units. */
// An array holding instances of components responsible for analysis
protected ProcessingUnit[] processingUnits = null;
+ protected Future<?>[] processingUnitResults = null;
// Instantiate a Processing Unit containing CasConsumers. There may be many Analysis Processing
// Units
/** The cas consumer PU. */
// but there is one CasConsumer Processing Unit ( at least for now).
private ProcessingUnit casConsumerPU = null;
+ private Future<?> casConsumerPUResult;
/** The output queue. */
// Queue where result of analysis goes to be consumed by Consumers
@@ -309,7 +312,7 @@ public class CPMEngine extends Thread {
* Initializes Collection Processing Engine. Assigns this thread and all processing threads
* created by this component to a common Thread Group.
*
- * @param aThreadGroup
+ * @param aExecutorService
* - contains all CPM related threads
* @param aCpeFactory
* - CPE factory object responsible for parsing cpe descriptor and creating components
@@ -320,9 +323,10 @@ public class CPMEngine extends Thread {
* @throws Exception
* the exception
*/
- public CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr,
- CheckpointData aCheckpointData) throws Exception {
- super(aThreadGroup, "CPMEngine Thread");
+ public CPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory,
+ ProcessTrace aProcTr, CheckpointData aCheckpointData) throws Exception {
+ setName("CPMEngine Thread");
+ executorService = aExecutorService;
cpeFactory = aCpeFactory;
// Accumulate trace info in provided ProcessTrace instance
procTr = aProcTr;
@@ -357,6 +361,10 @@ public class CPMEngine extends Thread {
}
}
+ CPMExecutorService getExecutorService() {
+ return executorService;
+ }
+
/**
* Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by
* its own container.
@@ -647,7 +655,8 @@ public class CPMEngine extends Thread {
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_terminate_pipelines__INFO",
new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
}
- new Thread() {
+
+ executorService.submit(new Thread() {
@Override
public void run() {
Object[] eofToken = new Object[1];
@@ -739,10 +748,8 @@ public class CPMEngine extends Thread {
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
}
-
}
- }.start();
-
+ });
}
/**
@@ -1686,7 +1693,7 @@ public class CPMEngine extends Thread {
private void startDebugControlThread() {
String dbgCtrlFile = System.getProperty("DEBUG_CONTROL");
dbgCtrlThread = new DebugControlThread(this, dbgCtrlFile, 1000);
- dbgCtrlThread.start();
+ executorService.submit(dbgCtrlThread);
}
/**
@@ -1840,13 +1847,8 @@ public class CPMEngine extends Thread {
new Object[] { Thread.currentThread().getName(), t.getMessage() });
return;
} finally {
- ((CPMThreadGroup) getThreadGroup()).cleanup();
- // Fix for memory leak. CPMThreadGroup must be
- // destroyed, but not until AFTER all threads that it
- // owns, including this one, have ended. - Adam
- final ThreadGroup group = getThreadGroup();
- Thread threadGroupDestroyer = new ThreadGroupDestroyer(group);
- threadGroupDestroyer.start();
+ executorService.cleanup();
+ executorService.shutdown();
}
}
@@ -2015,7 +2017,7 @@ public class CPMEngine extends Thread {
// name the thread
casConsumerPU.setName("[CasConsumer Pipeline Thread]::");
// start the CasConsumer Thread
- casConsumerPU.start();
+ casConsumerPUResult = executorService.submit(casConsumerPU);
consumerThreadStarted = true;
}
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
@@ -2068,6 +2070,7 @@ public class CPMEngine extends Thread {
// Setup Processing Pipelines
processingUnits = new ProcessingUnit[concurrentThreadCount];
+ processingUnitResults = new Future<?>[concurrentThreadCount];
synchronized (this) {
activeProcessingUnits = concurrentThreadCount; // keeps track of how many threads are still
// active. -Adam
@@ -2145,7 +2148,7 @@ public class CPMEngine extends Thread {
processingUnits[i].setName("[Procesing Pipeline#" + (i + 1) + " Thread]::");
// Start the Processing Pipeline
- processingUnits[i].start();
+ processingUnitResults[i] = executorService.submit(processingUnits[i]);
processingThreadsState[i] = 1; // Started
}
@@ -2153,7 +2156,7 @@ public class CPMEngine extends Thread {
// Start the ArtifactProducer thread and the Collection Reader embedded therein. The
// Collection Reader begins
// processing and deposits CASes onto a work queue.
- producer.start();
+ producerResult = executorService.submit(producer);
readerThreadStarted = true;
// Indicate that ALL threads making up the CPE have been started
@@ -2180,7 +2183,7 @@ public class CPMEngine extends Thread {
// simply terminates the thread. Once it terminates lets just make sure that
// all threads finish and the work queue is completely depleted and all entities
// are processed
- producer.join();
+ producerResult.get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_thread_completed__FINEST",
@@ -2195,7 +2198,7 @@ public class CPMEngine extends Thread {
new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
String.valueOf(i) });
}
- processingUnits[i].join();
+ processingUnitResults[i].get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu_complete__FINEST",
@@ -2250,7 +2253,7 @@ public class CPMEngine extends Thread {
}
- casConsumerPU.join();
+ casConsumerPUResult.get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc_completed__FINEST",
@@ -2361,9 +2364,9 @@ public class CPMEngine extends Thread {
if (producer != null && !producer.isRunning()) {
try {
if (!readerThreadStarted) {
- producer.start();
+ executorService.submit(producer);
}
- producer.join();
+ producerResult.get();
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_exception__SEVERE",
@@ -2377,9 +2380,9 @@ public class CPMEngine extends Thread {
if (casConsumerPU != null && !casConsumerPU.isRunning()) {
try {
if (!consumerThreadStarted) {
- casConsumerPU.start();
+ executorService.submit(casConsumerPU);
}
- casConsumerPU.join();
+ casConsumerPUResult.get();
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_exception__SEVERE",
@@ -2412,10 +2415,10 @@ public class CPMEngine extends Thread {
// In such a case 'processingThreadsState[i] = -1'
if (processingThreadsState[i] == -1 && !processingUnits[i].isRunning()) {
- processingUnits[i].start();
+ executorService.submit(processingUnits[i]);
}
try {
- processingUnits[i].join();
+ processingUnitResults[i].get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
@@ -2485,9 +2488,11 @@ public class CPMEngine extends Thread {
notifyListenersWithException(e);
}
try {
- casConsumerPU.join();
+ casConsumerPUResult.get();
} catch (InterruptedException e) {
+ } catch (ExecutionException e) {
+ e.printStackTrace();
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
@@ -2496,12 +2501,7 @@ public class CPMEngine extends Thread {
}
}
- // Fix for memory leak. CPMThreadGroup must be
- // destroyed, but not until AFTER all threads that it
- // owns, including this one, have ended. - Adam
- final ThreadGroup group = getThreadGroup();
- Thread threadGroupDestroyer = new ThreadGroupDestroyer(group);
- threadGroupDestroyer.start();
+ executorService.shutdown();
}
}
@@ -2593,6 +2593,7 @@ public class CPMEngine extends Thread {
producer.cleanup();
}
producer = null;
+ producerResult = null;
if (consumerDeployList != null) {
consumerDeployList.clear();
@@ -2625,6 +2626,7 @@ public class CPMEngine extends Thread {
consumers = null;
processingUnits = null;
+ processingUnitResults = null;
casprocessorList = null;
// this.enProcSt = null;
stats = null;
@@ -3580,74 +3582,74 @@ public class CPMEngine extends Thread {
}
}
- private static class ThreadGroupDestroyer extends Thread {
- private Set<Thread> foreignThreadsBlockingShutdown = new LinkedHashSet<>();
-
- private final ThreadGroup group;
-
- private final AtomicInteger count = new AtomicInteger();
-
- public ThreadGroupDestroyer(ThreadGroup aGroup) {
- super(aGroup.getParent(), "threadGroupDestroyer");
-
- group = aGroup;
-
- }
-
- @Override
- public void run() {
-
- while (group.activeCount() > 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- Thread[] threads = new Thread[group.activeCount()];
- group.enumerate(threads);
- showThreads(threads, foreignThreadsBlockingShutdown, count);
- }
-
- if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
- "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
- "UIMA_CPM_destroy_thread_group__FINEST",
- new Object[] { Thread.currentThread().getName() });
- }
-
- group.destroy();
- }
-
- private void showThreads(Thread[] aThreadList, Set<Thread> foreignThreadsBlockingShutdown,
- AtomicInteger count) {
- count.incrementAndGet();
-
- if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
- if (aThreadList[i] != null) {
- UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
- "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_thread__FINEST",
- new Object[] { Thread.currentThread().getName(), String.valueOf(i),
- aThreadList[i].getName() });
- }
- }
- }
-
- if (count.get() % 5 == 0) {
- for (Thread thread : aThreadList) {
- Set<Thread> nonUimaThreads = new HashSet<>();
- if (!(thread instanceof ProcessingUnit) && !(thread instanceof ArtifactProducer)
- && !foreignThreadsBlockingShutdown.contains(thread)) {
- nonUimaThreads.add(thread);
- }
-
- if (nonUimaThreads.size() == aThreadList.length) {
- UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
- "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unknown_thread__WARNING",
- new Object[] { thread.getName() });
- foreignThreadsBlockingShutdown.add(thread);
- }
- }
- }
- }
- }
+ // private static class ThreadGroupDestroyer extends Thread {
+ // private Set<Thread> foreignThreadsBlockingShutdown = new LinkedHashSet<>();
+ //
+ // private final ThreadGroup group;
+ //
+ // private final AtomicInteger count = new AtomicInteger();
+ //
+ // public ThreadGroupDestroyer(ThreadGroup aGroup) {
+ // super(aGroup.getParent(), "threadGroupDestroyer");
+ //
+ // group = aGroup;
+ //
+ // }
+ //
+ // @Override
+ // public void run() {
+ //
+ // while (group.activeCount() > 0) {
+ // try {
+ // Thread.sleep(100);
+ // } catch (InterruptedException e) {
+ // }
+ // Thread[] threads = new Thread[group.activeCount()];
+ // group.enumerate(threads);
+ // showThreads(threads, foreignThreadsBlockingShutdown, count);
+ // }
+ //
+ // if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ // UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
+ // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
+ // "UIMA_CPM_destroy_thread_group__FINEST",
+ // new Object[] { Thread.currentThread().getName() });
+ // }
+ //
+ // group.destroy();
+ // }
+ //
+ // private void showThreads(Thread[] aThreadList, Set<Thread> foreignThreadsBlockingShutdown,
+ // AtomicInteger count) {
+ // count.incrementAndGet();
+ //
+ // if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ // for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
+ // if (aThreadList[i] != null) {
+ // UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
+ // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_thread__FINEST",
+ // new Object[] { Thread.currentThread().getName(), String.valueOf(i),
+ // aThreadList[i].getName() });
+ // }
+ // }
+ // }
+ //
+ // if (count.get() % 5 == 0) {
+ // for (Thread thread : aThreadList) {
+ // Set<Thread> nonUimaThreads = new HashSet<>();
+ // if (!(thread instanceof ProcessingUnit) && !(thread instanceof ArtifactProducer)
+ // && !foreignThreadsBlockingShutdown.contains(thread)) {
+ // nonUimaThreads.add(thread);
+ // }
+ //
+ // if (nonUimaThreads.size() == aThreadList.length) {
+ // UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
+ // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unknown_thread__WARNING",
+ // new Object[] { thread.getName() });
+ // foreignThreadsBlockingShutdown.add(thread);
+ // }
+ // }
+ // }
+ // }
+ // }
}
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
similarity index 55%
rename from uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java
rename to uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
index c01f7df..f676145 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
@@ -19,105 +19,104 @@
package org.apache.uima.collection.impl.cpm.engine;
-import java.util.ArrayList;
+import static org.apache.uima.UIMAFramework.getLogger;
+import static org.apache.uima.collection.impl.cpm.utils.CPMUtils.CPM_LOG_RESOURCE_BUNDLE;
+import static org.apache.uima.util.Level.FINER;
+import static org.apache.uima.util.Level.SEVERE;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import org.apache.uima.UIMAFramework;
import org.apache.uima.collection.StatusCallbackListener;
import org.apache.uima.collection.base_cpm.BaseStatusCallbackListener;
import org.apache.uima.collection.impl.EntityProcessStatusImpl;
-import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
-import org.apache.uima.util.Level;
import org.apache.uima.util.ProcessTrace;
-
/**
* This component catches uncaught errors in the CPM. All critical threads in the CPM are part of
* this ThreadGroup. If OutOfMemory Error is thrown this component is notified by the JVM and its
* job is to notify registered listeners.
- *
*/
-public class CPMThreadGroup extends ThreadGroup {
-
- /** The callback listeners. */
- private ArrayList callbackListeners = null;
+public class CPMExecutorService extends ThreadPoolExecutor {
- /** The proc tr. */
- private ProcessTrace procTr = null;
+ private List<BaseStatusCallbackListener> callbackListeners = null;
- /**
- * Instantiates a new CPM thread group.
- *
- * @param name the name
- */
- public CPMThreadGroup(String name) {
- super(name);
- }
+ private ProcessTrace procTr = null;
- /**
- * Instantiates a new CPM thread group.
- *
- * @param parent -
- * parent thread group
- * @param name -
- * name of this thread group
- */
- public CPMThreadGroup(ThreadGroup parent, String name) {
- super(parent, name);
+ public CPMExecutorService() {
+ super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
/**
* Sets listeners to be used in notifications.
*
- * @param aListenerList -
- * list of registered listners
+ * @param aListenerList
+ * list of registered listeners
*/
- public void setListeners(ArrayList aListenerList) {
+ public void setListeners(List<BaseStatusCallbackListener> aListenerList) {
callbackListeners = aListenerList;
}
/**
* Sets the process trace.
*
- * @param aProcessTrace the new process trace
+ * @param aProcessTrace
+ * the new process trace
*/
public void setProcessTrace(ProcessTrace aProcessTrace) {
procTr = aProcessTrace;
}
- /* (non-Javadoc)
- * @see java.lang.ThreadGroup#uncaughtException(java.lang.Thread, java.lang.Throwable)
- */
@Override
- public void uncaughtException(Thread t, Throwable e) {
- if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
- UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
- "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unhandled_error__SEVERE",
- new Object[] { Thread.currentThread().getName(), e.getClass().getName() });
+ protected void afterExecute(Runnable aThread, Throwable aThrowable) {
+ Throwable throwable = aThrowable;
+ if (throwable == null && aThread instanceof FutureTask) {
+ try {
+ ((FutureTask<?>) aThread).get();
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (ExecutionException e) {
+ throwable = e.getCause();
+ }
+ }
+
+ if (throwable == null) {
+ return;
+ }
+
+ if (getLogger().isLoggable(SEVERE)) {
+ getLogger(this.getClass()).logrb(SEVERE, this.getClass().getName(), "process",
+ CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unhandled_error__SEVERE",
+ new Object[] { Thread.currentThread().getName(), throwable.getClass().getName() });
}
try {
// Notify listeners
for (int i = 0; callbackListeners != null && i < callbackListeners.size(); i++) {
- // System.out.println("ThreadGroup.uncaughtException()-Got Error - Notifying Listener");
- notifyListener((BaseStatusCallbackListener) callbackListeners.get(i), e);
+ notifyListener((BaseStatusCallbackListener) callbackListeners.get(i), throwable);
}
} catch (Throwable tr) {
- if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
- UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
- "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
+ if (getLogger().isLoggable(FINER)) {
+ getLogger(this.getClass()).logrb(FINER, this.getClass().getName(), "process",
+ CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), tr.getClass().getName() });
tr.printStackTrace();
}
}
- // System.out.println("ThreadGroup.uncaughtException()-Done Handling Error");
}
/**
* Notify listener.
*
- * @param aStatCL the a stat CL
- * @param e the e
+ * @param aStatCL
+ * the a stat CL
+ * @param e
+ * the e
*/
private void notifyListener(BaseStatusCallbackListener aStatCL, Throwable e) {
EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr);
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
index 806fe99..b380675 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
@@ -29,12 +29,11 @@ import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
import org.apache.uima.util.FileUtils;
import org.apache.uima.util.Level;
-
/**
* The Class DebugControlThread.
*/
public class DebugControlThread implements Runnable {
-
+
/** The Constant NOTFOUND. */
private final static String NOTFOUND = "NOT-FOUND";
@@ -61,9 +60,12 @@ public class DebugControlThread implements Runnable {
/**
* Instantiates a new debug control thread.
*
- * @param aCpm the a cpm
- * @param aFilename the a filename
- * @param aCheckpointFrequency the a checkpoint frequency
+ * @param aCpm
+ * the a cpm
+ * @param aFilename
+ * the a filename
+ * @param aCheckpointFrequency
+ * the a checkpoint frequency
*/
public DebugControlThread(CPMEngine aCpm, String aFilename, int aCheckpointFrequency) {
cpm = aCpm;
@@ -74,7 +76,8 @@ public class DebugControlThread implements Runnable {
/**
* Start.
*
- * @throws RuntimeException the runtime exception
+ * @throws RuntimeException
+ * the runtime exception
*/
public void start() throws RuntimeException {
if (fileName == null) {
@@ -84,10 +87,10 @@ public class DebugControlThread implements Runnable {
"UIMA_CPM_checkpoint_target_not_defined__FINEST",
new Object[] { Thread.currentThread().getName() });
}
- throw new RuntimeException(CpmLocalizedMessage.getLocalizedMessage(
- CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
- "UIMA_CPM_EXP_target_checkpoint_not_defined__WARNING", new Object[] { Thread
- .currentThread().getName() }));
+ throw new RuntimeException(
+ CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
+ "UIMA_CPM_EXP_target_checkpoint_not_defined__WARNING",
+ new Object[] { Thread.currentThread().getName() }));
}
if (cpm == null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
@@ -100,7 +103,7 @@ public class DebugControlThread implements Runnable {
CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_cpm__WARNING",
new Object[] { Thread.currentThread().getName() }));
} else {
- new Thread(this).start();
+ cpm.getExecutorService().submit(this);
}
}
@@ -113,7 +116,9 @@ public class DebugControlThread implements Runnable {
doCheckpoint();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see java.lang.Runnable#run()
*/
@Override
@@ -147,7 +152,8 @@ public class DebugControlThread implements Runnable {
/**
* Interpret and execute command.
*
- * @param aCommand the a command
+ * @param aCommand
+ * the a command
*/
private void interpretAndExecuteCommand(String aCommand) {
if (aCommand == null) {