You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by re...@apache.org on 2022/01/19 12:38:10 UTC

[uima-uimaj] branch feature/UIMA-6412-Stop-using-ThreadGroup-in-CPMEngine created (now ac4221f)

This is an automated email from the ASF dual-hosted git repository.

rec pushed a change to branch feature/UIMA-6412-Stop-using-ThreadGroup-in-CPMEngine
in repository https://gitbox.apache.org/repos/asf/uima-uimaj.git.


      at ac4221f  [UIMA-6412] Stop using ThreadGroup in CPMEngine

This branch includes the following new commits:

     new ac4221f  [UIMA-6412] Stop using ThreadGroup in CPMEngine

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[uima-uimaj] 01/01: [UIMA-6412] Stop using ThreadGroup in CPMEngine

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rec pushed a commit to branch feature/UIMA-6412-Stop-using-ThreadGroup-in-CPMEngine
in repository https://gitbox.apache.org/repos/asf/uima-uimaj.git

commit ac4221f5d7bd33704896aeffd1f772740926de81
Author: Richard Eckart de Castilho <re...@apache.org>
AuthorDate: Wed Jan 19 13:38:04 2022 +0100

    [UIMA-6412] Stop using ThreadGroup in CPMEngine
    
    - Refactor the CPMThreadGroup into a CPMExecutionService
    - Replace Thread.join() calls with Future.get() calls
    - Replace the ThreadGroupDestroyer thread a simple call to shutdown()
    - Replace the uncaught exception handler with overriding ThreadPoolExecutor.afterExecute()
    - Clean up and format few classes
---
 .../uima/collection/impl/cpm/BaseCPMImpl.java      | 375 ++++++++++++---------
 .../uima/collection/impl/cpm/Checkpoint.java       |   7 -
 .../uima/collection/impl/cpm/engine/CPMEngine.java | 222 ++++++------
 ...CPMThreadGroup.java => CPMExecutorService.java} |  99 +++---
 .../impl/cpm/engine/DebugControlThread.java        |  32 +-
 5 files changed, 401 insertions(+), 334 deletions(-)

diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
index 972e7ee..05fa071 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
@@ -21,13 +21,13 @@ package org.apache.uima.collection.impl.cpm;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMARuntimeException;
@@ -44,7 +44,7 @@ import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer;
 import org.apache.uima.collection.impl.cpm.container.CPEFactory;
 import org.apache.uima.collection.impl.cpm.container.deployer.socket.ProcessControllerAdapter;
 import org.apache.uima.collection.impl.cpm.engine.CPMEngine;
-import org.apache.uima.collection.impl.cpm.engine.CPMThreadGroup;
+import org.apache.uima.collection.impl.cpm.engine.CPMExecutorService;
 import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
 import org.apache.uima.collection.impl.cpm.utils.CasMetaData;
 import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
@@ -62,15 +62,12 @@ import org.apache.uima.util.Progress;
 import org.apache.uima.util.UimaTimer;
 import org.apache.uima.util.impl.ProcessTrace_impl;
 
-
 /**
  * Main thread that launches CPE and manages it. An application interacts with the running CPE via
  * this object. Through an API, an application may start, pause, resume, and stop a CPE.
- * 
- * 
  */
 public class BaseCPMImpl implements BaseCPM, Runnable {
-  
+
   /** The default process trace. */
   private boolean defaultProcessTrace;
 
@@ -107,55 +104,60 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /** The m event type map. */
   private Map mEventTypeMap;
 
-  /** The cpm thread group. */
-  public CPMThreadGroup cpmThreadGroup = null;
+  /** The CPE executor service. */
+  public CPMExecutorService cpmExecutorService = null;
 
   /**
    * Instantiates and initializes CPE Factory with a given CPE Descriptor and defaults.
    * 
-   * @param aDescriptor -
-   *          parsed CPE descriptor
-   * @throws Exception -
+   * @param aDescriptor
+   *          - parsed CPE descriptor
+   * @throws Exception
+   *           -
    */
   public BaseCPMImpl(CpeDescription aDescriptor) throws Exception {
     this(aDescriptor, null, true, UIMAFramework.getDefaultPerformanceTuningProperties());
-    cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+    cpmExecutorService = new CPMExecutorService();
   }
 
   /**
    * Instantiates and initializes CPE Factory responsible for creating individual components that
    * are part of the processing pipeline.
    *
-   * @param aDescriptor -
-   *          parsed CPE descriptor
-   * @param aResourceManager -
-   *          ResourceManager instance to be used by the CPE
-   * @param aDefaultProcessTrace -
-   *          ProcessTrace instance to capture events and stats
-   * @param aProps the a props
-   * @throws Exception -
+   * @param aDescriptor
+   *          - parsed CPE descriptor
+   * @param aResourceManager
+   *          - ResourceManager instance to be used by the CPE
+   * @param aDefaultProcessTrace
+   *          - ProcessTrace instance to capture events and stats
+   * @param aProps
+   *          the a props
+   * @throws Exception
+   *           -
    */
   public BaseCPMImpl(CpeDescription aDescriptor, ResourceManager aResourceManager,
           boolean aDefaultProcessTrace, Properties aProps) throws Exception {
     cpeFactory = new CPEFactory(aDescriptor, aResourceManager);
     defaultProcessTrace = aDefaultProcessTrace;
-    cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+    cpmExecutorService = new CPMExecutorService();
     init(false, aProps);
   }
 
   /**
    * Parses CPE descriptor.
    *
-   * @param mode -
-   *          indicates if the CPM should use a static descriptor or one provided
-   * @param aDescriptor -
-   *          provided descriptor path
-   * @param aResourceManager          ResourceManager to be used by CPM
-   * @throws Exception -
+   * @param mode
+   *          - indicates if the CPM should use a static descriptor or one provided
+   * @param aDescriptor
+   *          - provided descriptor path
+   * @param aResourceManager
+   *          ResourceManager to be used by CPM
+   * @throws Exception
+   *           -
    */
   public BaseCPMImpl(Boolean mode, String aDescriptor, ResourceManager aResourceManager)
           throws Exception {
-    cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+    cpmExecutorService = new CPMExecutorService();
     cpeFactory = new CPEFactory(aResourceManager);
     if (mode == null) {
       defaultProcessTrace = true;
@@ -170,7 +172,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Plugs in custom perfomance tunning parameters.
    *
-   * @param aPerformanceTuningSettings the new performance tuning settings
+   * @param aPerformanceTuningSettings
+   *          the new performance tuning settings
    */
   public void setPerformanceTuningSettings(Properties aPerformanceTuningSettings) {
     cpEngine.setPerformanceTuningSettings(aPerformanceTuningSettings);
@@ -180,8 +183,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * Plugs in a given {@link ProcessControllerAdapter}. The CPM uses this adapter to request Cas
    * Processor restarts and shutdown.
    * 
-   * @param aPca -
-   *          instance of the ProcessControllerAdapter
+   * @param aPca
+   *          - instance of the ProcessControllerAdapter
    */
   public void setProcessControllerAdapter(ProcessControllerAdapter aPca) {
     cpEngine.setProcessControllerAdapter(aPca);
@@ -192,7 +195,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * use at the end of processing. Jedii-style reporting shows a summary for this run. The CPM
    * default report shows more detail information.
    *
-   * @param aUseJediiReport the new jedii report
+   * @param aUseJediiReport
+   *          the new jedii report
    */
   public void setJediiReport(boolean aUseJediiReport) {
     mEventTypeMap = new HashMap();
@@ -205,9 +209,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Instantiates and initializes a CPE.
    *
-   * @param aDummyCasProcessor -
-   * @param aProps the a props
-   * @throws Exception -
+   * @param aDummyCasProcessor
+   *          -
+   * @param aProps
+   *          the a props
+   * @throws Exception
+   *           -
    */
   public void init(boolean aDummyCasProcessor, Properties aProps) throws Exception {
     String uimaTimerClass = cpeFactory.getCPEConfig().getTimerImpl();
@@ -239,8 +246,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
     if (checkpointFileName != null && checkpointFileName.trim().length() > 0) {
       File checkpointFile = new File(checkpointFileName);
-      checkpoint = new Checkpoint(this, checkpointFileName, cpeFactory.getCPEConfig()
-              .getCheckpoint().getFrequency());
+      checkpoint = new Checkpoint(this, checkpointFileName,
+              cpeFactory.getCPEConfig().getCheckpoint().getFrequency());
       // Check if the checkpoint file already exists. If it does, the CPM did not complete
       // successfully during the previous run and CPM will start in recovery mode, restoring all
       // totals and status's from the recovered checkpoint. The processing pipeline state will
@@ -259,7 +266,7 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       }
     }
     // Instantiate class responsible for processing
-    cpEngine = new CPMEngine(cpmThreadGroup, cpeFactory, procTr, checkpointData);
+    cpEngine = new CPMEngine(cpmExecutorService, cpeFactory, procTr, checkpointData);
     if (!aDummyCasProcessor) {
       int concurrentThreadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors()
               .getConcurrentPUCount();
@@ -267,12 +274,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
         CasProcessor[] casProcessors = cpeFactory.getCasProcessors();
         for (int i = 0; i < casProcessors.length; i++) {
           if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
-            UIMAFramework.getLogger(this.getClass()).logrb(
-                    Level.CONFIG,
-                    this.getClass().getName(),
-                    "process",
-                    CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-                    "UIMA_CPM_add_cp__CONFIG",
+            UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
+                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_add_cp__CONFIG",
                     new Object[] { Thread.currentThread().getName(),
                         casProcessors[i].getProcessingResourceMetaData().getName() });
           }
@@ -299,8 +302,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       cpEngine.setInputQueueSize(casPoolSize == 0 ? iqSize : casPoolSize);
     } catch (NumberFormatException e) {
       throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] {
-                  Thread.currentThread().getName(), "inputQueueSize" }));
+              "UIMA_CPM_EXP_queue_size_not_defined__WARNING",
+              new Object[] { Thread.currentThread().getName(), "inputQueueSize" }));
     }
     try {
       int oqSize = 0;
@@ -310,16 +313,17 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       cpEngine.setOutputQueueSize(casPoolSize == 0 ? oqSize : casPoolSize + 2);
     } catch (NumberFormatException e) {
       throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] {
-                  Thread.currentThread().getName(), "outputQueueSize" }));
+              "UIMA_CPM_EXP_queue_size_not_defined__WARNING",
+              new Object[] { Thread.currentThread().getName(), "outputQueueSize" }));
     }
     try {
       int threadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getConcurrentPUCount();
       cpEngine.setConcurrentThreadSize(threadCount);
     } catch (NumberFormatException e) {
       throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_invalid_component_reference__WARNING", new Object[] {
-                  Thread.currentThread().getName(), "casProcessors", "processingUnitThreadCount" }));
+              "UIMA_CPM_EXP_invalid_component_reference__WARNING",
+              new Object[] { Thread.currentThread().getName(), "casProcessors",
+                  "processingUnitThreadCount" }));
     }
   }
 
@@ -327,13 +331,16 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * Returns {@link CPEConfig} object holding current CPE configuration.
    *
    * @return CPEConfig instance
-   * @throws Exception -
+   * @throws Exception
+   *           -
    */
   public CpeConfiguration getCPEConfig() throws Exception {
     return cpeFactory.getCPEConfig();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#getCasProcessors()
    */
   /*
@@ -347,29 +354,40 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return casProcs == null ? new CasProcessor[0] : casProcs;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor)
    */
   /*
    * Adds given CasProcessor to the processing pipeline. A new CasProcessor is appended to the
    * current list.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor)
    */
   @Override
   public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException {
     cpEngine.addCasProcessor(aCasProcessor);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor, int)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor, int)
    */
   /*
    * Adds given CasProcessor to the processing pipeline. A new CasProcessor is inserted into a given
    * spot in the current list.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor,
-   *      int)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor, int)
    */
   @Override
   public void addCasProcessor(CasProcessor aCasProcessor, int aIndex)
@@ -377,26 +395,34 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     cpEngine.addCasProcessor(aCasProcessor, aIndex);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   /*
    * Removes given CasProcessor from the processing pipeline.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   @Override
   public void removeCasProcessor(CasProcessor aCasProcessor) {
     cpEngine.removeCasProcessor(0);
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(java.lang.String)
    */
   /*
    * Disables given CasProcessor in the existing processing pipeline.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   @Override
   public void disableCasProcessor(String aCasProcessorName) {
@@ -407,19 +433,24 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Enable cas processor.
    *
-   * @param aCasProcessorName the a cas processor name
+   * @param aCasProcessorName
+   *          the a cas processor name
    */
   /*
    * Disables given CasProcessor in the existing processing pipeline.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   public void enableCasProcessor(String aCasProcessorName) {
 
     cpEngine.enableCasProcessor(aCasProcessorName);
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isSerialProcessingRequired()
    */
   /*
@@ -440,7 +471,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   public void setSerialProcessingRequired(boolean aRequired) {
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isPauseOnException()
    */
   /*
@@ -453,7 +486,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return cpEngine.isPauseOnException();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#setPauseOnException(boolean)
    */
   /*
@@ -466,34 +501,44 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     cpEngine.setPauseOnException(aPause);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   /*
    * Adds Event Listener. Important events like, end of entity processing, exceptions, etc will be
    * sent to the registered listeners.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   @Override
   public void addStatusCallbackListener(BaseStatusCallbackListener aListener) {
     cpEngine.addStatusCallbackListener(aListener);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   /*
    * Remoces named listener from the listener list.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   @Override
   public void removeStatusCallbackListener(BaseStatusCallbackListener aListener) {
     cpEngine.removeStatusCallbackListener(aListener);
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see java.lang.Runnable#run()
    */
   /*
@@ -523,11 +568,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
 
       // Start the checkpoint thread
       if (checkpoint != null) {
-        new Thread(checkpoint).start();
+        cpmExecutorService.submit(checkpoint);
       }
-      cpEngine.start();
-      // Joing the CPMWorker Thread and wait until it finishes
-      cpEngine.join();
+
+      Future<?> cpEngineResult = cpmExecutorService.submit(cpEngine);
+      // Joining the CPMWorker Thread and wait until it finishes
+      cpEngineResult.get();
 
       completed = true;
       // If the entire collection has been processed there is no need for a checkpoint.
@@ -569,14 +615,13 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       }
       UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "" + e);
       killed = true;
-      ArrayList statusCbL = cpEngine.getCallbackListeners();
+      List<BaseStatusCallbackListener> statusCbL = cpEngine.getCallbackListeners();
       EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr, true);
       // e is the actual exception.
       enProcSt.addEventStatus("CPM", "Failed", e);
 
       // Notify all listeners that the CPM has finished processing
-      for (int j = 0; j < statusCbL.size(); j++) {
-        BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
+      for (BaseStatusCallbackListener st : statusCbL) {
         if (st != null && st instanceof StatusCallbackListener) {
           ((StatusCallbackListener) st).entityProcessComplete(null, enProcSt);
         }
@@ -592,11 +637,11 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
               "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_stopped__FINEST",
               new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
     }
-    ArrayList statusCbL = cpEngine.getCallbackListeners();
+
+    List<BaseStatusCallbackListener> statusCbL = cpEngine.getCallbackListeners();
     // Notify all listeners that the CPM has finished processing
-    for (int j = 0; j < statusCbL.size(); j++) {
-      BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
-      if ( st != null ) {
+    for (BaseStatusCallbackListener st : statusCbL) {
+      if (st != null) {
         if (!killed) {
           st.collectionProcessComplete();
         } else {
@@ -619,10 +664,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * This method is called by an applications to begin CPM processing with a given Collection. It
    * just creates a new thread and starts it.
    *
-   * @param aCollectionReader the a collection reader
-   * @throws ResourceInitializationException the resource initialization exception
+   * @param aCollectionReader
+   *          the a collection reader
+   * @throws ResourceInitializationException
+   *           the resource initialization exception
    * @see org.apache.uima.collection.base_cpm.BaseCPM#process()
-   * @deprecated 
+   * @deprecated
    */
   @Deprecated
   public void process(BaseCollectionReader aCollectionReader)
@@ -637,9 +684,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     if (cpeFactory.isDefault()) {
       cpeFactory.addCollectionReader(collectionReader);
     }
-    cpmThreadGroup.setProcessTrace(procTr);
-    cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
-    new Thread(this).start();
+    cpmExecutorService.setProcessTrace(procTr);
+    cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
+    cpmExecutorService.submit(this);
   }
 
   /**
@@ -664,21 +711,24 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     if (cpeFactory.isDefault()) {
       cpeFactory.addCollectionReader(collectionReader);
     }
-    cpmThreadGroup.setProcessTrace(procTr);
-    cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
+    cpmExecutorService.setProcessTrace(procTr);
+    cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
 
-    new Thread(this).start();
+    cpmExecutorService.submit(this);
   }
 
   /**
    * This method is called by an applications to begin CPM processing with a given Collection. It
    * just creates a new thread and starts it.
    *
-   * @param aCollectionReader the a collection reader
-   * @param aBatchSize the a batch size
-   * @throws ResourceInitializationException the resource initialization exception
+   * @param aCollectionReader
+   *          the a collection reader
+   * @param aBatchSize
+   *          the a batch size
+   * @throws ResourceInitializationException
+   *           the resource initialization exception
    * @see org.apache.uima.collection.base_cpm.BaseCPM#process()
-   * @deprecated 
+   * @deprecated
    */
   @Deprecated
   public void process(BaseCollectionReader aCollectionReader, int aBatchSize)
@@ -689,10 +739,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     if (cpeFactory.isDefault()) {
       cpeFactory.addCollectionReader(collectionReader);
     }
-    cpmThreadGroup.setProcessTrace(procTr);
-    cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
-
-    new Thread(cpmThreadGroup, this).start();
+    cpmExecutorService.setProcessTrace(procTr);
+    cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
+    cpmExecutorService.submit(this);
   }
 
   /**
@@ -726,7 +775,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isProcessing()
    */
   /*
@@ -740,7 +791,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return cpEngine.isRunning();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#pause()
    */
   /*
@@ -757,7 +810,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isPaused()
    */
   /*
@@ -770,7 +825,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return cpEngine.isPaused();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#resume(boolean)
    */
   /*
@@ -783,7 +840,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     resume();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#resume()
    */
   /*
@@ -825,7 +884,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
 
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#stop()
    */
   /*
@@ -888,7 +949,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Decode status.
    *
-   * @param aStatus the a status
+   * @param aStatus
+   *          the a status
    * @return the string
    */
   /*
@@ -920,10 +982,14 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Copy component events.
    *
-   * @param aEvType the a ev type
-   * @param aList the a list
-   * @param aPTr the a P tr
-   * @throws IOException Signals that an I/O exception has occurred.
+   * @param aEvType
+   *          the a ev type
+   * @param aList
+   *          the a list
+   * @param aPTr
+   *          the a P tr
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
    */
   /*
    * Copies events of a given type found in the list to a provided ProcessTrace instance
@@ -946,10 +1012,10 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Helper method to display stats and totals.
    *
-   * @param aProcessTrace -
-   *          trace containing stats
-   * @param aNumDocsProcessed -
-   *          number of entities processed so far
+   * @param aProcessTrace
+   *          - trace containing stats
+   * @param aNumDocsProcessed
+   *          - number of entities processed so far
    */
   public void displayStats(ProcessTrace aProcessTrace, int aNumDocsProcessed) {
     if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
@@ -965,10 +1031,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       // get the time.
       if ("CPM".equals(event.getComponentName())) {
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-          UIMAFramework.getLogger(this.getClass()).log(
-                  Level.FINEST,
-                  "Current Component::" + event.getComponentName() + " Time::"
-                          + event.getDuration());
+          UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "Current Component::"
+                  + event.getComponentName() + " Time::" + event.getDuration());
         }
         continue;
       }
@@ -1000,8 +1064,10 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Helper method to help build the CPM report.
    *
-   * @param aEvent the a event
-   * @param aTotalTime the a total time
+   * @param aEvent
+   *          the a event
+   * @param aTotalTime
+   *          the a total time
    */
   public void buildEventTree(ProcessTraceEvent aEvent, int aTotalTime) {
     // Skip reporting the CPM time.This time has already been acquired by summing up
@@ -1019,10 +1085,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
 
     if (System.getProperty("DEBUG") != null) {
-      UIMAFramework.getLogger(this.getClass()).log(
-              Level.FINEST,
-              "" + pct + "% (" + duration + "ms) - " + aEvent.getComponentName() + " (" + type
-                      + ")");
+      UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "" + pct + "% (" + duration
+              + "ms) - " + aEvent.getComponentName() + " (" + type + ")");
     }
     Iterator it = aEvent.getSubEvents().iterator();
     while (it.hasNext()) {
@@ -1103,16 +1167,16 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
         Long totalCollectionReaderTime = (Long) perfReport.get("COLLECTION_READER_TIME");
         String readerName = collectionReader.getProcessingResourceMetaData().getName();
         if (totalCollectionReaderTime != null) {
-          processTrace.addEvent(readerName, "COLLECTION_READER_TIME", String
-                  .valueOf(totalCollectionReaderTime), 0, null);
+          processTrace.addEvent(readerName, "COLLECTION_READER_TIME",
+                  String.valueOf(totalCollectionReaderTime), 0, null);
         }
         for (int i = 0; i < colReaderProgress.length; i++) {
           if (Progress.BYTES.equals(colReaderProgress[i].getUnit())) {
-            processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED, String
-                    .valueOf(colReaderProgress[i].getCompleted()), 0, null);
+            processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED,
+                    String.valueOf(colReaderProgress[i].getCompleted()), 0, null);
           } else if (Progress.ENTITIES.equals(colReaderProgress[i].getUnit())) {
-            processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED, String
-                    .valueOf(colReaderProgress[i].getCompleted()), 0, null);
+            processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED,
+                    String.valueOf(colReaderProgress[i].getCompleted()), 0, null);
           }
         }
 
@@ -1133,8 +1197,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
         }
         copyComponentEvents("Process", eList, processTrace);
 
-        processTrace.addEvent(container.getName(), "Documents Processed", String.valueOf(container
-                .getProcessed()), 0, null);
+        processTrace.addEvent(container.getName(), "Documents Processed",
+                String.valueOf(container.getProcessed()), 0, null);
         String status = decodeStatus(container.getStatus());
         processTrace.addEvent(container.getName(), "Processor Status", status, 0, null);
 
@@ -1147,20 +1211,20 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
                 0, null);
 
         int restartCount = container.getRestartCount();
-        processTrace.addEvent(container.getName(), "Processor Restarts", String
-                .valueOf(restartCount), 0, null);
+        processTrace.addEvent(container.getName(), "Processor Restarts",
+                String.valueOf(restartCount), 0, null);
 
         int retryCount = container.getRetryCount();
         processTrace.addEvent(container.getName(), "Processor Retries", String.valueOf(retryCount),
                 0, null);
 
         int filteredCount = container.getFilteredCount();
-        processTrace.addEvent(container.getName(), "Filtered Entities", String
-                .valueOf(filteredCount), 0, null);
+        processTrace.addEvent(container.getName(), "Filtered Entities",
+                String.valueOf(filteredCount), 0, null);
 
         long remainingCount = container.getRemaining();
-        processTrace.addEvent(container.getName(), "Processor Remaining", String
-                .valueOf(remainingCount), 0, null);
+        processTrace.addEvent(container.getName(), "Processor Remaining",
+                String.valueOf(remainingCount), 0, null);
 
         HashMap aMap = container.getAllStats();
 
@@ -1181,16 +1245,15 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
                           "Custom String Stat-" + key + " Value=" + (String) o);
                 }
               } else if (o instanceof Integer) {
-                processTrace.addEvent(container.getName(), key, String.valueOf(((Integer) o)
-                        .intValue()), 0, null);
+                processTrace.addEvent(container.getName(), key,
+                        String.valueOf(((Integer) o).intValue()), 0, null);
                 if (System.getProperty("SHOW_CUSTOM_STATS") != null) {
                   UIMAFramework.getLogger(this.getClass()).log(Level.FINEST,
                           "Custom Integer Stat-" + key + " Value=" + o);
-                } 
+                }
               } else {
                 if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-                  UIMAFramework.getLogger(this.getClass()).log(
-                          Level.FINEST,
+                  UIMAFramework.getLogger(this.getClass()).log(Level.FINEST,
                           "Invalid Type Found When Generating Status For " + key + ". Type::"
                                   + o.getClass().getName()
                                   + " Not supported. Use Integer or String instead.");
@@ -1218,9 +1281,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Creates the default process trace.
    *
-   * @param aProcessors the a processors
-   * @param srcProcTr the src proc tr
-   * @param aProcessTrace the a process trace
+   * @param aProcessors
+   *          the a processors
+   * @param srcProcTr
+   *          the src proc tr
+   * @param aProcessTrace
+   *          the a process trace
    */
   private void createDefaultProcessTrace(CasProcessor[] aProcessors, ProcessTrace srcProcTr,
           ProcessTrace aProcessTrace) {
@@ -1253,10 +1319,11 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Returns a CPE descriptor as a String.
    *
-   * @param aList -
-   *          list of components
+   * @param aList
+   *          - list of components
    * @return - descriptor populated with a given components
-   * @throws ResourceConfigurationException the resource configuration exception
+   * @throws ResourceConfigurationException
+   *           the resource configuration exception
    */
   public String getDescriptor(List aList) throws ResourceConfigurationException {
     return cpeFactory.getDescriptor(aList);
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
index be1217b..f472b46 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
@@ -88,13 +88,6 @@ public class Checkpoint implements Runnable {
   }
 
   /**
-   * Start the thread.
-   */
-  public void start() {
-    new Thread(this).start();
-  }
-
-  /**
    * Stops the checkpoint thread.
    */
   public void stop() {
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
index 9a08962..afe544b 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
@@ -24,16 +24,14 @@ import java.lang.reflect.Constructor;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.adapter.vinci.util.Descriptor;
@@ -111,7 +109,9 @@ public class CPMEngine extends Thread {
   /** The Constant SINGLE_THREADED_MODE. */
   private static final String SINGLE_THREADED_MODE = "single-threaded";
 
-  /** The cas pool. */
+  private final CPMExecutorService executorService;
+
+  /** The CAS pool. */
   public CPECasPool casPool;
 
   /** The lock for pause. */
@@ -215,6 +215,7 @@ public class CPMEngine extends Thread {
   /** The producer. */
   // work Queue
   private ArtifactProducer producer = null;
+  private Future<?> producerResult;
 
   /** The cpe factory. */
   // Factory responsible for instantiating CPE components from CPE descriptor
@@ -223,12 +224,14 @@ public class CPMEngine extends Thread {
   /** The processing units. */
   // An array holding instances of components responsible for analysis
   protected ProcessingUnit[] processingUnits = null;
+  protected Future<?>[] processingUnitResults = null;
 
   // Instantiate a Processing Unit containing CasConsumers. There may be many Analysis Processing
   // Units
   /** The cas consumer PU. */
   // but there is one CasConsumer Processing Unit ( at least for now).
   private ProcessingUnit casConsumerPU = null;
+  private Future<?> casConsumerPUResult;
 
   /** The output queue. */
   // Queue where result of analysis goes to be consumed by Consumers
@@ -309,7 +312,7 @@ public class CPMEngine extends Thread {
    * Initializes Collection Processing Engine. Assigns this thread and all processing threads
    * created by this component to a common Thread Group.
    *
-   * @param aThreadGroup
+   * @param aExecutorService
    *          - contains all CPM related threads
    * @param aCpeFactory
    *          - CPE factory object responsible for parsing cpe descriptor and creating components
@@ -320,9 +323,10 @@ public class CPMEngine extends Thread {
    * @throws Exception
    *           the exception
    */
-  public CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr,
-          CheckpointData aCheckpointData) throws Exception {
-    super(aThreadGroup, "CPMEngine Thread");
+  public CPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory,
+          ProcessTrace aProcTr, CheckpointData aCheckpointData) throws Exception {
+    setName("CPMEngine Thread");
+    executorService = aExecutorService;
     cpeFactory = aCpeFactory;
     // Accumulate trace info in provided ProcessTrace instance
     procTr = aProcTr;
@@ -357,6 +361,10 @@ public class CPMEngine extends Thread {
     }
   }
 
+  CPMExecutorService getExecutorService() {
+    return executorService;
+  }
+
   /**
    * Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by
    * its own container.
@@ -647,7 +655,8 @@ public class CPMEngine extends Thread {
               "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_terminate_pipelines__INFO",
               new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
     }
-    new Thread() {
+
+    executorService.submit(new Thread() {
       @Override
       public void run() {
         Object[] eofToken = new Object[1];
@@ -739,10 +748,8 @@ public class CPMEngine extends Thread {
                   "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
                   new Object[] { Thread.currentThread().getName(), e.getMessage() });
         }
-
       }
-    }.start();
-
+    });
   }
 
   /**
@@ -1686,7 +1693,7 @@ public class CPMEngine extends Thread {
   private void startDebugControlThread() {
     String dbgCtrlFile = System.getProperty("DEBUG_CONTROL");
     dbgCtrlThread = new DebugControlThread(this, dbgCtrlFile, 1000);
-    dbgCtrlThread.start();
+    executorService.submit(dbgCtrlThread);
   }
 
   /**
@@ -1840,13 +1847,8 @@ public class CPMEngine extends Thread {
                 new Object[] { Thread.currentThread().getName(), t.getMessage() });
         return;
       } finally {
-        ((CPMThreadGroup) getThreadGroup()).cleanup();
-        // Fix for memory leak. CPMThreadGroup must be
-        // destroyed, but not until AFTER all threads that it
-        // owns, including this one, have ended. - Adam
-        final ThreadGroup group = getThreadGroup();
-        Thread threadGroupDestroyer = new ThreadGroupDestroyer(group);
-        threadGroupDestroyer.start();
+        executorService.cleanup();
+        executorService.shutdown();
       }
     }
 
@@ -2015,7 +2017,7 @@ public class CPMEngine extends Thread {
         // name the thread
         casConsumerPU.setName("[CasConsumer Pipeline Thread]::");
         // start the CasConsumer Thread
-        casConsumerPU.start();
+        casConsumerPUResult = executorService.submit(casConsumerPU);
         consumerThreadStarted = true;
       }
       if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
@@ -2068,6 +2070,7 @@ public class CPMEngine extends Thread {
 
       // Setup Processing Pipelines
       processingUnits = new ProcessingUnit[concurrentThreadCount];
+      processingUnitResults = new Future<?>[concurrentThreadCount];
       synchronized (this) {
         activeProcessingUnits = concurrentThreadCount; // keeps track of how many threads are still
         // active. -Adam
@@ -2145,7 +2148,7 @@ public class CPMEngine extends Thread {
         processingUnits[i].setName("[Procesing Pipeline#" + (i + 1) + " Thread]::");
 
         // Start the Processing Pipeline
-        processingUnits[i].start();
+        processingUnitResults[i] = executorService.submit(processingUnits[i]);
         processingThreadsState[i] = 1; // Started
       }
 
@@ -2153,7 +2156,7 @@ public class CPMEngine extends Thread {
       // Start the ArtifactProducer thread and the Collection Reader embedded therein. The
       // Collection Reader begins
       // processing and deposits CASes onto a work queue.
-      producer.start();
+      producerResult = executorService.submit(producer);
       readerThreadStarted = true;
 
       // Indicate that ALL threads making up the CPE have been started
@@ -2180,7 +2183,7 @@ public class CPMEngine extends Thread {
       // simply terminates the thread. Once it terminates lets just make sure that
       // all threads finish and the work queue is completely depleted and all entities
       // are processed
-      producer.join();
+      producerResult.get();
       if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                 "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_thread_completed__FINEST",
@@ -2195,7 +2198,7 @@ public class CPMEngine extends Thread {
                   new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
                       String.valueOf(i) });
         }
-        processingUnits[i].join();
+        processingUnitResults[i].get();
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                   "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu_complete__FINEST",
@@ -2250,7 +2253,7 @@ public class CPMEngine extends Thread {
 
         }
 
-        casConsumerPU.join();
+        casConsumerPUResult.get();
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                   "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc_completed__FINEST",
@@ -2361,9 +2364,9 @@ public class CPMEngine extends Thread {
         if (producer != null && !producer.isRunning()) {
           try {
             if (!readerThreadStarted) {
-              producer.start();
+              executorService.submit(producer);
             }
-            producer.join();
+            producerResult.get();
           } catch (Exception ex1) {
             UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                     "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_exception__SEVERE",
@@ -2377,9 +2380,9 @@ public class CPMEngine extends Thread {
         if (casConsumerPU != null && !casConsumerPU.isRunning()) {
           try {
             if (!consumerThreadStarted) {
-              casConsumerPU.start();
+              executorService.submit(casConsumerPU);
             }
-            casConsumerPU.join();
+            casConsumerPUResult.get();
           } catch (Exception ex1) {
             UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                     "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_exception__SEVERE",
@@ -2412,10 +2415,10 @@ public class CPMEngine extends Thread {
               // In such a case 'processingThreadsState[i] = -1'
 
               if (processingThreadsState[i] == -1 && !processingUnits[i].isRunning()) {
-                processingUnits[i].start();
+                executorService.submit(processingUnits[i]);
               }
               try {
-                processingUnits[i].join();
+                processingUnitResults[i].get();
                 if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                   UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                           this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
@@ -2485,9 +2488,11 @@ public class CPMEngine extends Thread {
           notifyListenersWithException(e);
         }
         try {
-          casConsumerPU.join();
+          casConsumerPUResult.get();
         } catch (InterruptedException e) {
 
+        } catch (ExecutionException e) {
+          e.printStackTrace();
         }
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
@@ -2496,12 +2501,7 @@ public class CPMEngine extends Thread {
         }
       }
 
-      // Fix for memory leak. CPMThreadGroup must be
-      // destroyed, but not until AFTER all threads that it
-      // owns, including this one, have ended. - Adam
-      final ThreadGroup group = getThreadGroup();
-      Thread threadGroupDestroyer = new ThreadGroupDestroyer(group);
-      threadGroupDestroyer.start();
+      executorService.shutdown();
     }
   }
 
@@ -2593,6 +2593,7 @@ public class CPMEngine extends Thread {
         producer.cleanup();
       }
       producer = null;
+      producerResult = null;
 
       if (consumerDeployList != null) {
         consumerDeployList.clear();
@@ -2625,6 +2626,7 @@ public class CPMEngine extends Thread {
       consumers = null;
 
       processingUnits = null;
+      processingUnitResults = null;
       casprocessorList = null;
       // this.enProcSt = null;
       stats = null;
@@ -3580,74 +3582,74 @@ public class CPMEngine extends Thread {
     }
   }
 
-  private static class ThreadGroupDestroyer extends Thread {
-    private Set<Thread> foreignThreadsBlockingShutdown = new LinkedHashSet<>();
-
-    private final ThreadGroup group;
-
-    private final AtomicInteger count = new AtomicInteger();
-
-    public ThreadGroupDestroyer(ThreadGroup aGroup) {
-      super(aGroup.getParent(), "threadGroupDestroyer");
-
-      group = aGroup;
-
-    }
-
-    @Override
-    public void run() {
-
-      while (group.activeCount() > 0) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-        }
-        Thread[] threads = new Thread[group.activeCount()];
-        group.enumerate(threads);
-        showThreads(threads, foreignThreadsBlockingShutdown, count);
-      }
-
-      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
-                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-                "UIMA_CPM_destroy_thread_group__FINEST",
-                new Object[] { Thread.currentThread().getName() });
-      }
-
-      group.destroy();
-    }
-
-    private void showThreads(Thread[] aThreadList, Set<Thread> foreignThreadsBlockingShutdown,
-            AtomicInteger count) {
-      count.incrementAndGet();
-
-      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-        for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
-          if (aThreadList[i] != null) {
-            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
-                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_thread__FINEST",
-                    new Object[] { Thread.currentThread().getName(), String.valueOf(i),
-                        aThreadList[i].getName() });
-          }
-        }
-      }
-
-      if (count.get() % 5 == 0) {
-        for (Thread thread : aThreadList) {
-          Set<Thread> nonUimaThreads = new HashSet<>();
-          if (!(thread instanceof ProcessingUnit) && !(thread instanceof ArtifactProducer)
-                  && !foreignThreadsBlockingShutdown.contains(thread)) {
-            nonUimaThreads.add(thread);
-          }
-
-          if (nonUimaThreads.size() == aThreadList.length) {
-            UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
-                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unknown_thread__WARNING",
-                    new Object[] { thread.getName() });
-            foreignThreadsBlockingShutdown.add(thread);
-          }
-        }
-      }
-    }
-  }
+  // private static class ThreadGroupDestroyer extends Thread {
+  // private Set<Thread> foreignThreadsBlockingShutdown = new LinkedHashSet<>();
+  //
+  // private final ThreadGroup group;
+  //
+  // private final AtomicInteger count = new AtomicInteger();
+  //
+  // public ThreadGroupDestroyer(ThreadGroup aGroup) {
+  // super(aGroup.getParent(), "threadGroupDestroyer");
+  //
+  // group = aGroup;
+  //
+  // }
+  //
+  // @Override
+  // public void run() {
+  //
+  // while (group.activeCount() > 0) {
+  // try {
+  // Thread.sleep(100);
+  // } catch (InterruptedException e) {
+  // }
+  // Thread[] threads = new Thread[group.activeCount()];
+  // group.enumerate(threads);
+  // showThreads(threads, foreignThreadsBlockingShutdown, count);
+  // }
+  //
+  // if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+  // UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
+  // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
+  // "UIMA_CPM_destroy_thread_group__FINEST",
+  // new Object[] { Thread.currentThread().getName() });
+  // }
+  //
+  // group.destroy();
+  // }
+  //
+  // private void showThreads(Thread[] aThreadList, Set<Thread> foreignThreadsBlockingShutdown,
+  // AtomicInteger count) {
+  // count.incrementAndGet();
+  //
+  // if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+  // for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
+  // if (aThreadList[i] != null) {
+  // UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
+  // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_thread__FINEST",
+  // new Object[] { Thread.currentThread().getName(), String.valueOf(i),
+  // aThreadList[i].getName() });
+  // }
+  // }
+  // }
+  //
+  // if (count.get() % 5 == 0) {
+  // for (Thread thread : aThreadList) {
+  // Set<Thread> nonUimaThreads = new HashSet<>();
+  // if (!(thread instanceof ProcessingUnit) && !(thread instanceof ArtifactProducer)
+  // && !foreignThreadsBlockingShutdown.contains(thread)) {
+  // nonUimaThreads.add(thread);
+  // }
+  //
+  // if (nonUimaThreads.size() == aThreadList.length) {
+  // UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
+  // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unknown_thread__WARNING",
+  // new Object[] { thread.getName() });
+  // foreignThreadsBlockingShutdown.add(thread);
+  // }
+  // }
+  // }
+  // }
+  // }
 }
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
similarity index 55%
rename from uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java
rename to uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
index c01f7df..f676145 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
@@ -19,105 +19,104 @@
 
 package org.apache.uima.collection.impl.cpm.engine;
 
-import java.util.ArrayList;
+import static org.apache.uima.UIMAFramework.getLogger;
+import static org.apache.uima.collection.impl.cpm.utils.CPMUtils.CPM_LOG_RESOURCE_BUNDLE;
+import static org.apache.uima.util.Level.FINER;
+import static org.apache.uima.util.Level.SEVERE;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.uima.UIMAFramework;
 import org.apache.uima.collection.StatusCallbackListener;
 import org.apache.uima.collection.base_cpm.BaseStatusCallbackListener;
 import org.apache.uima.collection.impl.EntityProcessStatusImpl;
-import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
-import org.apache.uima.util.Level;
 import org.apache.uima.util.ProcessTrace;
 
-
 /**
  * This component catches uncaught errors in the CPM. All critical threads in the CPM are part of
  * this ThreadGroup. If OutOfMemory Error is thrown this component is notified by the JVM and its
  * job is to notify registered listeners.
- * 
  */
-public class CPMThreadGroup extends ThreadGroup {
-  
-  /** The callback listeners. */
-  private ArrayList callbackListeners = null;
+public class CPMExecutorService extends ThreadPoolExecutor {
 
-  /** The proc tr. */
-  private ProcessTrace procTr = null;
+  private List<BaseStatusCallbackListener> callbackListeners = null;
 
-  /**
-   * Instantiates a new CPM thread group.
-   *
-   * @param name the name
-   */
-  public CPMThreadGroup(String name) {
-    super(name);
-  }
+  private ProcessTrace procTr = null;
 
-  /**
-   * Instantiates a new CPM thread group.
-   *
-   * @param parent -
-   *          parent thread group
-   * @param name -
-   *          name of this thread group
-   */
-  public CPMThreadGroup(ThreadGroup parent, String name) {
-    super(parent, name);
+  public CPMExecutorService() {
+    super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
   }
 
   /**
    * Sets listeners to be used in notifications.
    *
-   * @param aListenerList -
-   *          list of registered listners
+   * @param aListenerList
+   *          list of registered listeners
    */
-  public void setListeners(ArrayList aListenerList) {
+  public void setListeners(List<BaseStatusCallbackListener> aListenerList) {
     callbackListeners = aListenerList;
   }
 
   /**
    * Sets the process trace.
    *
-   * @param aProcessTrace the new process trace
+   * @param aProcessTrace
+   *          the new process trace
    */
   public void setProcessTrace(ProcessTrace aProcessTrace) {
     procTr = aProcessTrace;
   }
 
-  /* (non-Javadoc)
-   * @see java.lang.ThreadGroup#uncaughtException(java.lang.Thread, java.lang.Throwable)
-   */
   @Override
-  public void uncaughtException(Thread t, Throwable e) {
-    if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
-      UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
-              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unhandled_error__SEVERE",
-              new Object[] { Thread.currentThread().getName(), e.getClass().getName() });
+  protected void afterExecute(Runnable aThread, Throwable aThrowable) {
+    Throwable throwable = aThrowable;
+    if (throwable == null && aThread instanceof FutureTask) {
+      try {
+        ((FutureTask<?>) aThread).get();
+      } catch (InterruptedException e) {
+        // Ignore
+      } catch (ExecutionException e) {
+        throwable = e.getCause();
+      }
+    }
+
+    if (throwable == null) {
+      return;
+    }
+
+    if (getLogger().isLoggable(SEVERE)) {
+      getLogger(this.getClass()).logrb(SEVERE, this.getClass().getName(), "process",
+              CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unhandled_error__SEVERE",
+              new Object[] { Thread.currentThread().getName(), throwable.getClass().getName() });
 
     }
     try {
       // Notify listeners
       for (int i = 0; callbackListeners != null && i < callbackListeners.size(); i++) {
-        // System.out.println("ThreadGroup.uncaughtException()-Got Error - Notifying Listener");
-        notifyListener((BaseStatusCallbackListener) callbackListeners.get(i), e);
+        notifyListener((BaseStatusCallbackListener) callbackListeners.get(i), throwable);
       }
 
     } catch (Throwable tr) {
-      if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
-        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
-                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
+      if (getLogger().isLoggable(FINER)) {
+        getLogger(this.getClass()).logrb(FINER, this.getClass().getName(), "process",
+                CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
                 new Object[] { Thread.currentThread().getName(), tr.getClass().getName() });
         tr.printStackTrace();
       }
     }
-    // System.out.println("ThreadGroup.uncaughtException()-Done Handling Error");
   }
 
   /**
    * Notify listener.
    *
-   * @param aStatCL the a stat CL
-   * @param e the e
+   * @param aStatCL
+   *          the a stat CL
+   * @param e
+   *          the e
    */
   private void notifyListener(BaseStatusCallbackListener aStatCL, Throwable e) {
     EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr);
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
index 806fe99..b380675 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
@@ -29,12 +29,11 @@ import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
 import org.apache.uima.util.FileUtils;
 import org.apache.uima.util.Level;
 
-
 /**
  * The Class DebugControlThread.
  */
 public class DebugControlThread implements Runnable {
-  
+
   /** The Constant NOTFOUND. */
   private final static String NOTFOUND = "NOT-FOUND";
 
@@ -61,9 +60,12 @@ public class DebugControlThread implements Runnable {
   /**
    * Instantiates a new debug control thread.
    *
-   * @param aCpm the a cpm
-   * @param aFilename the a filename
-   * @param aCheckpointFrequency the a checkpoint frequency
+   * @param aCpm
+   *          the a cpm
+   * @param aFilename
+   *          the a filename
+   * @param aCheckpointFrequency
+   *          the a checkpoint frequency
    */
   public DebugControlThread(CPMEngine aCpm, String aFilename, int aCheckpointFrequency) {
     cpm = aCpm;
@@ -74,7 +76,8 @@ public class DebugControlThread implements Runnable {
   /**
    * Start.
    *
-   * @throws RuntimeException the runtime exception
+   * @throws RuntimeException
+   *           the runtime exception
    */
   public void start() throws RuntimeException {
     if (fileName == null) {
@@ -84,10 +87,10 @@ public class DebugControlThread implements Runnable {
                 "UIMA_CPM_checkpoint_target_not_defined__FINEST",
                 new Object[] { Thread.currentThread().getName() });
       }
-      throw new RuntimeException(CpmLocalizedMessage.getLocalizedMessage(
-              CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_target_checkpoint_not_defined__WARNING", new Object[] { Thread
-                      .currentThread().getName() }));
+      throw new RuntimeException(
+              CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
+                      "UIMA_CPM_EXP_target_checkpoint_not_defined__WARNING",
+                      new Object[] { Thread.currentThread().getName() }));
     }
     if (cpm == null) {
       if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
@@ -100,7 +103,7 @@ public class DebugControlThread implements Runnable {
               CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_cpm__WARNING",
               new Object[] { Thread.currentThread().getName() }));
     } else {
-      new Thread(this).start();
+      cpm.getExecutorService().submit(this);
     }
   }
 
@@ -113,7 +116,9 @@ public class DebugControlThread implements Runnable {
     doCheckpoint();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see java.lang.Runnable#run()
    */
   @Override
@@ -147,7 +152,8 @@ public class DebugControlThread implements Runnable {
   /**
    * Interpret and execute command.
    *
-   * @param aCommand the a command
+   * @param aCommand
+   *          the a command
    */
   private void interpretAndExecuteCommand(String aCommand) {
     if (aCommand == null) {