You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by re...@apache.org on 2022/01/19 12:38:11 UTC

[uima-uimaj] 01/01: [UIMA-6412] Stop using ThreadGroup in CPMEngine

This is an automated email from the ASF dual-hosted git repository.

rec pushed a commit to branch feature/UIMA-6412-Stop-using-ThreadGroup-in-CPMEngine
in repository https://gitbox.apache.org/repos/asf/uima-uimaj.git

commit ac4221f5d7bd33704896aeffd1f772740926de81
Author: Richard Eckart de Castilho <re...@apache.org>
AuthorDate: Wed Jan 19 13:38:04 2022 +0100

    [UIMA-6412] Stop using ThreadGroup in CPMEngine
    
    - Refactor the CPMThreadGroup into a CPMExecutionService
    - Replace Thread.join() calls with Future.get() calls
    - Replace the ThreadGroupDestroyer thread a simple call to shutdown()
    - Replace the uncaught exception handler with overriding ThreadPoolExecutor.afterExecute()
    - Clean up and format few classes
---
 .../uima/collection/impl/cpm/BaseCPMImpl.java      | 375 ++++++++++++---------
 .../uima/collection/impl/cpm/Checkpoint.java       |   7 -
 .../uima/collection/impl/cpm/engine/CPMEngine.java | 222 ++++++------
 ...CPMThreadGroup.java => CPMExecutorService.java} |  99 +++---
 .../impl/cpm/engine/DebugControlThread.java        |  32 +-
 5 files changed, 401 insertions(+), 334 deletions(-)

diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
index 972e7ee..05fa071 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/BaseCPMImpl.java
@@ -21,13 +21,13 @@ package org.apache.uima.collection.impl.cpm;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMARuntimeException;
@@ -44,7 +44,7 @@ import org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer;
 import org.apache.uima.collection.impl.cpm.container.CPEFactory;
 import org.apache.uima.collection.impl.cpm.container.deployer.socket.ProcessControllerAdapter;
 import org.apache.uima.collection.impl.cpm.engine.CPMEngine;
-import org.apache.uima.collection.impl.cpm.engine.CPMThreadGroup;
+import org.apache.uima.collection.impl.cpm.engine.CPMExecutorService;
 import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
 import org.apache.uima.collection.impl.cpm.utils.CasMetaData;
 import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
@@ -62,15 +62,12 @@ import org.apache.uima.util.Progress;
 import org.apache.uima.util.UimaTimer;
 import org.apache.uima.util.impl.ProcessTrace_impl;
 
-
 /**
  * Main thread that launches CPE and manages it. An application interacts with the running CPE via
  * this object. Through an API, an application may start, pause, resume, and stop a CPE.
- * 
- * 
  */
 public class BaseCPMImpl implements BaseCPM, Runnable {
-  
+
   /** The default process trace. */
   private boolean defaultProcessTrace;
 
@@ -107,55 +104,60 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /** The m event type map. */
   private Map mEventTypeMap;
 
-  /** The cpm thread group. */
-  public CPMThreadGroup cpmThreadGroup = null;
+  /** The CPE executor service. */
+  public CPMExecutorService cpmExecutorService = null;
 
   /**
    * Instantiates and initializes CPE Factory with a given CPE Descriptor and defaults.
    * 
-   * @param aDescriptor -
-   *          parsed CPE descriptor
-   * @throws Exception -
+   * @param aDescriptor
+   *          - parsed CPE descriptor
+   * @throws Exception
+   *           -
    */
   public BaseCPMImpl(CpeDescription aDescriptor) throws Exception {
     this(aDescriptor, null, true, UIMAFramework.getDefaultPerformanceTuningProperties());
-    cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+    cpmExecutorService = new CPMExecutorService();
   }
 
   /**
    * Instantiates and initializes CPE Factory responsible for creating individual components that
    * are part of the processing pipeline.
    *
-   * @param aDescriptor -
-   *          parsed CPE descriptor
-   * @param aResourceManager -
-   *          ResourceManager instance to be used by the CPE
-   * @param aDefaultProcessTrace -
-   *          ProcessTrace instance to capture events and stats
-   * @param aProps the a props
-   * @throws Exception -
+   * @param aDescriptor
+   *          - parsed CPE descriptor
+   * @param aResourceManager
+   *          - ResourceManager instance to be used by the CPE
+   * @param aDefaultProcessTrace
+   *          - ProcessTrace instance to capture events and stats
+   * @param aProps
+   *          the a props
+   * @throws Exception
+   *           -
    */
   public BaseCPMImpl(CpeDescription aDescriptor, ResourceManager aResourceManager,
           boolean aDefaultProcessTrace, Properties aProps) throws Exception {
     cpeFactory = new CPEFactory(aDescriptor, aResourceManager);
     defaultProcessTrace = aDefaultProcessTrace;
-    cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+    cpmExecutorService = new CPMExecutorService();
     init(false, aProps);
   }
 
   /**
    * Parses CPE descriptor.
    *
-   * @param mode -
-   *          indicates if the CPM should use a static descriptor or one provided
-   * @param aDescriptor -
-   *          provided descriptor path
-   * @param aResourceManager          ResourceManager to be used by CPM
-   * @throws Exception -
+   * @param mode
+   *          - indicates if the CPM should use a static descriptor or one provided
+   * @param aDescriptor
+   *          - provided descriptor path
+   * @param aResourceManager
+   *          ResourceManager to be used by CPM
+   * @throws Exception
+   *           -
    */
   public BaseCPMImpl(Boolean mode, String aDescriptor, ResourceManager aResourceManager)
           throws Exception {
-    cpmThreadGroup = new CPMThreadGroup("CPM Thread Group");
+    cpmExecutorService = new CPMExecutorService();
     cpeFactory = new CPEFactory(aResourceManager);
     if (mode == null) {
       defaultProcessTrace = true;
@@ -170,7 +172,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Plugs in custom perfomance tunning parameters.
    *
-   * @param aPerformanceTuningSettings the new performance tuning settings
+   * @param aPerformanceTuningSettings
+   *          the new performance tuning settings
    */
   public void setPerformanceTuningSettings(Properties aPerformanceTuningSettings) {
     cpEngine.setPerformanceTuningSettings(aPerformanceTuningSettings);
@@ -180,8 +183,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * Plugs in a given {@link ProcessControllerAdapter}. The CPM uses this adapter to request Cas
    * Processor restarts and shutdown.
    * 
-   * @param aPca -
-   *          instance of the ProcessControllerAdapter
+   * @param aPca
+   *          - instance of the ProcessControllerAdapter
    */
   public void setProcessControllerAdapter(ProcessControllerAdapter aPca) {
     cpEngine.setProcessControllerAdapter(aPca);
@@ -192,7 +195,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * use at the end of processing. Jedii-style reporting shows a summary for this run. The CPM
    * default report shows more detail information.
    *
-   * @param aUseJediiReport the new jedii report
+   * @param aUseJediiReport
+   *          the new jedii report
    */
   public void setJediiReport(boolean aUseJediiReport) {
     mEventTypeMap = new HashMap();
@@ -205,9 +209,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Instantiates and initializes a CPE.
    *
-   * @param aDummyCasProcessor -
-   * @param aProps the a props
-   * @throws Exception -
+   * @param aDummyCasProcessor
+   *          -
+   * @param aProps
+   *          the a props
+   * @throws Exception
+   *           -
    */
   public void init(boolean aDummyCasProcessor, Properties aProps) throws Exception {
     String uimaTimerClass = cpeFactory.getCPEConfig().getTimerImpl();
@@ -239,8 +246,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
     if (checkpointFileName != null && checkpointFileName.trim().length() > 0) {
       File checkpointFile = new File(checkpointFileName);
-      checkpoint = new Checkpoint(this, checkpointFileName, cpeFactory.getCPEConfig()
-              .getCheckpoint().getFrequency());
+      checkpoint = new Checkpoint(this, checkpointFileName,
+              cpeFactory.getCPEConfig().getCheckpoint().getFrequency());
       // Check if the checkpoint file already exists. If it does, the CPM did not complete
       // successfully during the previous run and CPM will start in recovery mode, restoring all
       // totals and status's from the recovered checkpoint. The processing pipeline state will
@@ -259,7 +266,7 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       }
     }
     // Instantiate class responsible for processing
-    cpEngine = new CPMEngine(cpmThreadGroup, cpeFactory, procTr, checkpointData);
+    cpEngine = new CPMEngine(cpmExecutorService, cpeFactory, procTr, checkpointData);
     if (!aDummyCasProcessor) {
       int concurrentThreadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors()
               .getConcurrentPUCount();
@@ -267,12 +274,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
         CasProcessor[] casProcessors = cpeFactory.getCasProcessors();
         for (int i = 0; i < casProcessors.length; i++) {
           if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
-            UIMAFramework.getLogger(this.getClass()).logrb(
-                    Level.CONFIG,
-                    this.getClass().getName(),
-                    "process",
-                    CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-                    "UIMA_CPM_add_cp__CONFIG",
+            UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
+                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_add_cp__CONFIG",
                     new Object[] { Thread.currentThread().getName(),
                         casProcessors[i].getProcessingResourceMetaData().getName() });
           }
@@ -299,8 +302,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       cpEngine.setInputQueueSize(casPoolSize == 0 ? iqSize : casPoolSize);
     } catch (NumberFormatException e) {
       throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] {
-                  Thread.currentThread().getName(), "inputQueueSize" }));
+              "UIMA_CPM_EXP_queue_size_not_defined__WARNING",
+              new Object[] { Thread.currentThread().getName(), "inputQueueSize" }));
     }
     try {
       int oqSize = 0;
@@ -310,16 +313,17 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       cpEngine.setOutputQueueSize(casPoolSize == 0 ? oqSize : casPoolSize + 2);
     } catch (NumberFormatException e) {
       throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_queue_size_not_defined__WARNING", new Object[] {
-                  Thread.currentThread().getName(), "outputQueueSize" }));
+              "UIMA_CPM_EXP_queue_size_not_defined__WARNING",
+              new Object[] { Thread.currentThread().getName(), "outputQueueSize" }));
     }
     try {
       int threadCount = cpeFactory.getCpeDescriptor().getCpeCasProcessors().getConcurrentPUCount();
       cpEngine.setConcurrentThreadSize(threadCount);
     } catch (NumberFormatException e) {
       throw new Exception(CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_invalid_component_reference__WARNING", new Object[] {
-                  Thread.currentThread().getName(), "casProcessors", "processingUnitThreadCount" }));
+              "UIMA_CPM_EXP_invalid_component_reference__WARNING",
+              new Object[] { Thread.currentThread().getName(), "casProcessors",
+                  "processingUnitThreadCount" }));
     }
   }
 
@@ -327,13 +331,16 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * Returns {@link CPEConfig} object holding current CPE configuration.
    *
    * @return CPEConfig instance
-   * @throws Exception -
+   * @throws Exception
+   *           -
    */
   public CpeConfiguration getCPEConfig() throws Exception {
     return cpeFactory.getCPEConfig();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#getCasProcessors()
    */
   /*
@@ -347,29 +354,40 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return casProcs == null ? new CasProcessor[0] : casProcs;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor)
    */
   /*
    * Adds given CasProcessor to the processing pipeline. A new CasProcessor is appended to the
    * current list.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor)
    */
   @Override
   public void addCasProcessor(CasProcessor aCasProcessor) throws ResourceConfigurationException {
     cpEngine.addCasProcessor(aCasProcessor);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor, int)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor, int)
    */
   /*
    * Adds given CasProcessor to the processing pipeline. A new CasProcessor is inserted into a given
    * spot in the current list.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor,
-   *      int)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#addCasProcessor(org.apache.uima.collection.base_cpm
+   * .CasProcessor, int)
    */
   @Override
   public void addCasProcessor(CasProcessor aCasProcessor, int aIndex)
@@ -377,26 +395,34 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     cpEngine.addCasProcessor(aCasProcessor, aIndex);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   /*
    * Removes given CasProcessor from the processing pipeline.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   @Override
   public void removeCasProcessor(CasProcessor aCasProcessor) {
     cpEngine.removeCasProcessor(0);
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(java.lang.String)
    */
   /*
    * Disables given CasProcessor in the existing processing pipeline.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   @Override
   public void disableCasProcessor(String aCasProcessorName) {
@@ -407,19 +433,24 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Enable cas processor.
    *
-   * @param aCasProcessorName the a cas processor name
+   * @param aCasProcessorName
+   *          the a cas processor name
    */
   /*
    * Disables given CasProcessor in the existing processing pipeline.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.base_cpm.CasProcessor)
+   * @see
+   * org.apache.uima.collection.base_cpm.BaseCPM#disableCasProcessor(org.apache.uima.collection.
+   * base_cpm.CasProcessor)
    */
   public void enableCasProcessor(String aCasProcessorName) {
 
     cpEngine.enableCasProcessor(aCasProcessorName);
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isSerialProcessingRequired()
    */
   /*
@@ -440,7 +471,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   public void setSerialProcessingRequired(boolean aRequired) {
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isPauseOnException()
    */
   /*
@@ -453,7 +486,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return cpEngine.isPauseOnException();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#setPauseOnException(boolean)
    */
   /*
@@ -466,34 +501,44 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     cpEngine.setPauseOnException(aPause);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   /*
    * Adds Event Listener. Important events like, end of entity processing, exceptions, etc will be
    * sent to the registered listeners.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#addStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   @Override
   public void addStatusCallbackListener(BaseStatusCallbackListener aListener) {
     cpEngine.addStatusCallbackListener(aListener);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   /*
    * Remoces named listener from the listener list.
    * 
-   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.collection.base_cpm.BaseStatusCallbackListener)
+   * @see org.apache.uima.collection.base_cpm.BaseCPM#removeStatusCallbackListener(org.apache.uima.
+   * collection.base_cpm.BaseStatusCallbackListener)
    */
   @Override
   public void removeStatusCallbackListener(BaseStatusCallbackListener aListener) {
     cpEngine.removeStatusCallbackListener(aListener);
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see java.lang.Runnable#run()
    */
   /*
@@ -523,11 +568,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
 
       // Start the checkpoint thread
       if (checkpoint != null) {
-        new Thread(checkpoint).start();
+        cpmExecutorService.submit(checkpoint);
       }
-      cpEngine.start();
-      // Joing the CPMWorker Thread and wait until it finishes
-      cpEngine.join();
+
+      Future<?> cpEngineResult = cpmExecutorService.submit(cpEngine);
+      // Joining the CPMWorker Thread and wait until it finishes
+      cpEngineResult.get();
 
       completed = true;
       // If the entire collection has been processed there is no need for a checkpoint.
@@ -569,14 +615,13 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       }
       UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "" + e);
       killed = true;
-      ArrayList statusCbL = cpEngine.getCallbackListeners();
+      List<BaseStatusCallbackListener> statusCbL = cpEngine.getCallbackListeners();
       EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr, true);
       // e is the actual exception.
       enProcSt.addEventStatus("CPM", "Failed", e);
 
       // Notify all listeners that the CPM has finished processing
-      for (int j = 0; j < statusCbL.size(); j++) {
-        BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
+      for (BaseStatusCallbackListener st : statusCbL) {
         if (st != null && st instanceof StatusCallbackListener) {
           ((StatusCallbackListener) st).entityProcessComplete(null, enProcSt);
         }
@@ -592,11 +637,11 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
               "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_stopped__FINEST",
               new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
     }
-    ArrayList statusCbL = cpEngine.getCallbackListeners();
+
+    List<BaseStatusCallbackListener> statusCbL = cpEngine.getCallbackListeners();
     // Notify all listeners that the CPM has finished processing
-    for (int j = 0; j < statusCbL.size(); j++) {
-      BaseStatusCallbackListener st = (BaseStatusCallbackListener) statusCbL.get(j);
-      if ( st != null ) {
+    for (BaseStatusCallbackListener st : statusCbL) {
+      if (st != null) {
         if (!killed) {
           st.collectionProcessComplete();
         } else {
@@ -619,10 +664,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
    * This method is called by an applications to begin CPM processing with a given Collection. It
    * just creates a new thread and starts it.
    *
-   * @param aCollectionReader the a collection reader
-   * @throws ResourceInitializationException the resource initialization exception
+   * @param aCollectionReader
+   *          the a collection reader
+   * @throws ResourceInitializationException
+   *           the resource initialization exception
    * @see org.apache.uima.collection.base_cpm.BaseCPM#process()
-   * @deprecated 
+   * @deprecated
    */
   @Deprecated
   public void process(BaseCollectionReader aCollectionReader)
@@ -637,9 +684,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     if (cpeFactory.isDefault()) {
       cpeFactory.addCollectionReader(collectionReader);
     }
-    cpmThreadGroup.setProcessTrace(procTr);
-    cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
-    new Thread(this).start();
+    cpmExecutorService.setProcessTrace(procTr);
+    cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
+    cpmExecutorService.submit(this);
   }
 
   /**
@@ -664,21 +711,24 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     if (cpeFactory.isDefault()) {
       cpeFactory.addCollectionReader(collectionReader);
     }
-    cpmThreadGroup.setProcessTrace(procTr);
-    cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
+    cpmExecutorService.setProcessTrace(procTr);
+    cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
 
-    new Thread(this).start();
+    cpmExecutorService.submit(this);
   }
 
   /**
    * This method is called by an applications to begin CPM processing with a given Collection. It
    * just creates a new thread and starts it.
    *
-   * @param aCollectionReader the a collection reader
-   * @param aBatchSize the a batch size
-   * @throws ResourceInitializationException the resource initialization exception
+   * @param aCollectionReader
+   *          the a collection reader
+   * @param aBatchSize
+   *          the a batch size
+   * @throws ResourceInitializationException
+   *           the resource initialization exception
    * @see org.apache.uima.collection.base_cpm.BaseCPM#process()
-   * @deprecated 
+   * @deprecated
    */
   @Deprecated
   public void process(BaseCollectionReader aCollectionReader, int aBatchSize)
@@ -689,10 +739,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     if (cpeFactory.isDefault()) {
       cpeFactory.addCollectionReader(collectionReader);
     }
-    cpmThreadGroup.setProcessTrace(procTr);
-    cpmThreadGroup.setListeners(cpEngine.getCallbackListeners());
-
-    new Thread(cpmThreadGroup, this).start();
+    cpmExecutorService.setProcessTrace(procTr);
+    cpmExecutorService.setListeners(cpEngine.getCallbackListeners());
+    cpmExecutorService.submit(this);
   }
 
   /**
@@ -726,7 +775,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isProcessing()
    */
   /*
@@ -740,7 +791,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return cpEngine.isRunning();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#pause()
    */
   /*
@@ -757,7 +810,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#isPaused()
    */
   /*
@@ -770,7 +825,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     return cpEngine.isPaused();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#resume(boolean)
    */
   /*
@@ -783,7 +840,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     resume();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#resume()
    */
   /*
@@ -825,7 +884,9 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
 
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.uima.collection.base_cpm.BaseCPM#stop()
    */
   /*
@@ -888,7 +949,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Decode status.
    *
-   * @param aStatus the a status
+   * @param aStatus
+   *          the a status
    * @return the string
    */
   /*
@@ -920,10 +982,14 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Copy component events.
    *
-   * @param aEvType the a ev type
-   * @param aList the a list
-   * @param aPTr the a P tr
-   * @throws IOException Signals that an I/O exception has occurred.
+   * @param aEvType
+   *          the a ev type
+   * @param aList
+   *          the a list
+   * @param aPTr
+   *          the a P tr
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
    */
   /*
    * Copies events of a given type found in the list to a provided ProcessTrace instance
@@ -946,10 +1012,10 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Helper method to display stats and totals.
    *
-   * @param aProcessTrace -
-   *          trace containing stats
-   * @param aNumDocsProcessed -
-   *          number of entities processed so far
+   * @param aProcessTrace
+   *          - trace containing stats
+   * @param aNumDocsProcessed
+   *          - number of entities processed so far
    */
   public void displayStats(ProcessTrace aProcessTrace, int aNumDocsProcessed) {
     if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
@@ -965,10 +1031,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
       // get the time.
       if ("CPM".equals(event.getComponentName())) {
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-          UIMAFramework.getLogger(this.getClass()).log(
-                  Level.FINEST,
-                  "Current Component::" + event.getComponentName() + " Time::"
-                          + event.getDuration());
+          UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "Current Component::"
+                  + event.getComponentName() + " Time::" + event.getDuration());
         }
         continue;
       }
@@ -1000,8 +1064,10 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Helper method to help build the CPM report.
    *
-   * @param aEvent the a event
-   * @param aTotalTime the a total time
+   * @param aEvent
+   *          the a event
+   * @param aTotalTime
+   *          the a total time
    */
   public void buildEventTree(ProcessTraceEvent aEvent, int aTotalTime) {
     // Skip reporting the CPM time.This time has already been acquired by summing up
@@ -1019,10 +1085,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
     }
 
     if (System.getProperty("DEBUG") != null) {
-      UIMAFramework.getLogger(this.getClass()).log(
-              Level.FINEST,
-              "" + pct + "% (" + duration + "ms) - " + aEvent.getComponentName() + " (" + type
-                      + ")");
+      UIMAFramework.getLogger(this.getClass()).log(Level.FINEST, "" + pct + "% (" + duration
+              + "ms) - " + aEvent.getComponentName() + " (" + type + ")");
     }
     Iterator it = aEvent.getSubEvents().iterator();
     while (it.hasNext()) {
@@ -1103,16 +1167,16 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
         Long totalCollectionReaderTime = (Long) perfReport.get("COLLECTION_READER_TIME");
         String readerName = collectionReader.getProcessingResourceMetaData().getName();
         if (totalCollectionReaderTime != null) {
-          processTrace.addEvent(readerName, "COLLECTION_READER_TIME", String
-                  .valueOf(totalCollectionReaderTime), 0, null);
+          processTrace.addEvent(readerName, "COLLECTION_READER_TIME",
+                  String.valueOf(totalCollectionReaderTime), 0, null);
         }
         for (int i = 0; i < colReaderProgress.length; i++) {
           if (Progress.BYTES.equals(colReaderProgress[i].getUnit())) {
-            processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED, String
-                    .valueOf(colReaderProgress[i].getCompleted()), 0, null);
+            processTrace.addEvent(readerName, Constants.COLLECTION_READER_BYTES_PROCESSED,
+                    String.valueOf(colReaderProgress[i].getCompleted()), 0, null);
           } else if (Progress.ENTITIES.equals(colReaderProgress[i].getUnit())) {
-            processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED, String
-                    .valueOf(colReaderProgress[i].getCompleted()), 0, null);
+            processTrace.addEvent(readerName, Constants.COLLECTION_READER_DOCS_PROCESSED,
+                    String.valueOf(colReaderProgress[i].getCompleted()), 0, null);
           }
         }
 
@@ -1133,8 +1197,8 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
         }
         copyComponentEvents("Process", eList, processTrace);
 
-        processTrace.addEvent(container.getName(), "Documents Processed", String.valueOf(container
-                .getProcessed()), 0, null);
+        processTrace.addEvent(container.getName(), "Documents Processed",
+                String.valueOf(container.getProcessed()), 0, null);
         String status = decodeStatus(container.getStatus());
         processTrace.addEvent(container.getName(), "Processor Status", status, 0, null);
 
@@ -1147,20 +1211,20 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
                 0, null);
 
         int restartCount = container.getRestartCount();
-        processTrace.addEvent(container.getName(), "Processor Restarts", String
-                .valueOf(restartCount), 0, null);
+        processTrace.addEvent(container.getName(), "Processor Restarts",
+                String.valueOf(restartCount), 0, null);
 
         int retryCount = container.getRetryCount();
         processTrace.addEvent(container.getName(), "Processor Retries", String.valueOf(retryCount),
                 0, null);
 
         int filteredCount = container.getFilteredCount();
-        processTrace.addEvent(container.getName(), "Filtered Entities", String
-                .valueOf(filteredCount), 0, null);
+        processTrace.addEvent(container.getName(), "Filtered Entities",
+                String.valueOf(filteredCount), 0, null);
 
         long remainingCount = container.getRemaining();
-        processTrace.addEvent(container.getName(), "Processor Remaining", String
-                .valueOf(remainingCount), 0, null);
+        processTrace.addEvent(container.getName(), "Processor Remaining",
+                String.valueOf(remainingCount), 0, null);
 
         HashMap aMap = container.getAllStats();
 
@@ -1181,16 +1245,15 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
                           "Custom String Stat-" + key + " Value=" + (String) o);
                 }
               } else if (o instanceof Integer) {
-                processTrace.addEvent(container.getName(), key, String.valueOf(((Integer) o)
-                        .intValue()), 0, null);
+                processTrace.addEvent(container.getName(), key,
+                        String.valueOf(((Integer) o).intValue()), 0, null);
                 if (System.getProperty("SHOW_CUSTOM_STATS") != null) {
                   UIMAFramework.getLogger(this.getClass()).log(Level.FINEST,
                           "Custom Integer Stat-" + key + " Value=" + o);
-                } 
+                }
               } else {
                 if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-                  UIMAFramework.getLogger(this.getClass()).log(
-                          Level.FINEST,
+                  UIMAFramework.getLogger(this.getClass()).log(Level.FINEST,
                           "Invalid Type Found When Generating Status For " + key + ". Type::"
                                   + o.getClass().getName()
                                   + " Not supported. Use Integer or String instead.");
@@ -1218,9 +1281,12 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Creates the default process trace.
    *
-   * @param aProcessors the a processors
-   * @param srcProcTr the src proc tr
-   * @param aProcessTrace the a process trace
+   * @param aProcessors
+   *          the a processors
+   * @param srcProcTr
+   *          the src proc tr
+   * @param aProcessTrace
+   *          the a process trace
    */
   private void createDefaultProcessTrace(CasProcessor[] aProcessors, ProcessTrace srcProcTr,
           ProcessTrace aProcessTrace) {
@@ -1253,10 +1319,11 @@ public class BaseCPMImpl implements BaseCPM, Runnable {
   /**
    * Returns a CPE descriptor as a String.
    *
-   * @param aList -
-   *          list of components
+   * @param aList
+   *          - list of components
    * @return - descriptor populated with a given components
-   * @throws ResourceConfigurationException the resource configuration exception
+   * @throws ResourceConfigurationException
+   *           the resource configuration exception
    */
   public String getDescriptor(List aList) throws ResourceConfigurationException {
     return cpeFactory.getDescriptor(aList);
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
index be1217b..f472b46 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/Checkpoint.java
@@ -88,13 +88,6 @@ public class Checkpoint implements Runnable {
   }
 
   /**
-   * Start the thread.
-   */
-  public void start() {
-    new Thread(this).start();
-  }
-
-  /**
    * Stops the checkpoint thread.
    */
   public void stop() {
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
index 9a08962..afe544b 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java
@@ -24,16 +24,14 @@ import java.lang.reflect.Constructor;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.adapter.vinci.util.Descriptor;
@@ -111,7 +109,9 @@ public class CPMEngine extends Thread {
   /** The Constant SINGLE_THREADED_MODE. */
   private static final String SINGLE_THREADED_MODE = "single-threaded";
 
-  /** The cas pool. */
+  private final CPMExecutorService executorService;
+
+  /** The CAS pool. */
   public CPECasPool casPool;
 
   /** The lock for pause. */
@@ -215,6 +215,7 @@ public class CPMEngine extends Thread {
   /** The producer. */
   // work Queue
   private ArtifactProducer producer = null;
+  private Future<?> producerResult;
 
   /** The cpe factory. */
   // Factory responsible for instantiating CPE components from CPE descriptor
@@ -223,12 +224,14 @@ public class CPMEngine extends Thread {
   /** The processing units. */
   // An array holding instances of components responsible for analysis
   protected ProcessingUnit[] processingUnits = null;
+  protected Future<?>[] processingUnitResults = null;
 
   // Instantiate a Processing Unit containing CasConsumers. There may be many Analysis Processing
   // Units
   /** The cas consumer PU. */
   // but there is one CasConsumer Processing Unit ( at least for now).
   private ProcessingUnit casConsumerPU = null;
+  private Future<?> casConsumerPUResult;
 
   /** The output queue. */
   // Queue where result of analysis goes to be consumed by Consumers
@@ -309,7 +312,7 @@ public class CPMEngine extends Thread {
    * Initializes Collection Processing Engine. Assigns this thread and all processing threads
    * created by this component to a common Thread Group.
    *
-   * @param aThreadGroup
+   * @param aExecutorService
    *          - contains all CPM related threads
    * @param aCpeFactory
    *          - CPE factory object responsible for parsing cpe descriptor and creating components
@@ -320,9 +323,10 @@ public class CPMEngine extends Thread {
    * @throws Exception
    *           the exception
    */
-  public CPMEngine(CPMThreadGroup aThreadGroup, CPEFactory aCpeFactory, ProcessTrace aProcTr,
-          CheckpointData aCheckpointData) throws Exception {
-    super(aThreadGroup, "CPMEngine Thread");
+  public CPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory,
+          ProcessTrace aProcTr, CheckpointData aCheckpointData) throws Exception {
+    setName("CPMEngine Thread");
+    executorService = aExecutorService;
     cpeFactory = aCpeFactory;
     // Accumulate trace info in provided ProcessTrace instance
     procTr = aProcTr;
@@ -357,6 +361,10 @@ public class CPMEngine extends Thread {
     }
   }
 
+  CPMExecutorService getExecutorService() {
+    return executorService;
+  }
+
   /**
    * Returns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by
    * its own container.
@@ -647,7 +655,8 @@ public class CPMEngine extends Thread {
               "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_terminate_pipelines__INFO",
               new Object[] { Thread.currentThread().getName(), String.valueOf(killed) });
     }
-    new Thread() {
+
+    executorService.submit(new Thread() {
       @Override
       public void run() {
         Object[] eofToken = new Object[1];
@@ -739,10 +748,8 @@ public class CPMEngine extends Thread {
                   "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
                   new Object[] { Thread.currentThread().getName(), e.getMessage() });
         }
-
       }
-    }.start();
-
+    });
   }
 
   /**
@@ -1686,7 +1693,7 @@ public class CPMEngine extends Thread {
   private void startDebugControlThread() {
     String dbgCtrlFile = System.getProperty("DEBUG_CONTROL");
     dbgCtrlThread = new DebugControlThread(this, dbgCtrlFile, 1000);
-    dbgCtrlThread.start();
+    executorService.submit(dbgCtrlThread);
   }
 
   /**
@@ -1840,13 +1847,8 @@ public class CPMEngine extends Thread {
                 new Object[] { Thread.currentThread().getName(), t.getMessage() });
         return;
       } finally {
-        ((CPMThreadGroup) getThreadGroup()).cleanup();
-        // Fix for memory leak. CPMThreadGroup must be
-        // destroyed, but not until AFTER all threads that it
-        // owns, including this one, have ended. - Adam
-        final ThreadGroup group = getThreadGroup();
-        Thread threadGroupDestroyer = new ThreadGroupDestroyer(group);
-        threadGroupDestroyer.start();
+        executorService.cleanup();
+        executorService.shutdown();
       }
     }
 
@@ -2015,7 +2017,7 @@ public class CPMEngine extends Thread {
         // name the thread
         casConsumerPU.setName("[CasConsumer Pipeline Thread]::");
         // start the CasConsumer Thread
-        casConsumerPU.start();
+        casConsumerPUResult = executorService.submit(casConsumerPU);
         consumerThreadStarted = true;
       }
       if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
@@ -2068,6 +2070,7 @@ public class CPMEngine extends Thread {
 
       // Setup Processing Pipelines
       processingUnits = new ProcessingUnit[concurrentThreadCount];
+      processingUnitResults = new Future<?>[concurrentThreadCount];
       synchronized (this) {
         activeProcessingUnits = concurrentThreadCount; // keeps track of how many threads are still
         // active. -Adam
@@ -2145,7 +2148,7 @@ public class CPMEngine extends Thread {
         processingUnits[i].setName("[Procesing Pipeline#" + (i + 1) + " Thread]::");
 
         // Start the Processing Pipeline
-        processingUnits[i].start();
+        processingUnitResults[i] = executorService.submit(processingUnits[i]);
         processingThreadsState[i] = 1; // Started
       }
 
@@ -2153,7 +2156,7 @@ public class CPMEngine extends Thread {
       // Start the ArtifactProducer thread and the Collection Reader embedded therein. The
       // Collection Reader begins
       // processing and deposits CASes onto a work queue.
-      producer.start();
+      producerResult = executorService.submit(producer);
       readerThreadStarted = true;
 
       // Indicate that ALL threads making up the CPE have been started
@@ -2180,7 +2183,7 @@ public class CPMEngine extends Thread {
       // simply terminates the thread. Once it terminates lets just make sure that
       // all threads finish and the work queue is completely depleted and all entities
       // are processed
-      producer.join();
+      producerResult.get();
       if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                 "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_thread_completed__FINEST",
@@ -2195,7 +2198,7 @@ public class CPMEngine extends Thread {
                   new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
                       String.valueOf(i) });
         }
-        processingUnits[i].join();
+        processingUnitResults[i].get();
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                   "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu_complete__FINEST",
@@ -2250,7 +2253,7 @@ public class CPMEngine extends Thread {
 
         }
 
-        casConsumerPU.join();
+        casConsumerPUResult.get();
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                   "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc_completed__FINEST",
@@ -2361,9 +2364,9 @@ public class CPMEngine extends Thread {
         if (producer != null && !producer.isRunning()) {
           try {
             if (!readerThreadStarted) {
-              producer.start();
+              executorService.submit(producer);
             }
-            producer.join();
+            producerResult.get();
           } catch (Exception ex1) {
             UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                     "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_exception__SEVERE",
@@ -2377,9 +2380,9 @@ public class CPMEngine extends Thread {
         if (casConsumerPU != null && !casConsumerPU.isRunning()) {
           try {
             if (!consumerThreadStarted) {
-              casConsumerPU.start();
+              executorService.submit(casConsumerPU);
             }
-            casConsumerPU.join();
+            casConsumerPUResult.get();
           } catch (Exception ex1) {
             UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                     "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_exception__SEVERE",
@@ -2412,10 +2415,10 @@ public class CPMEngine extends Thread {
               // In such a case 'processingThreadsState[i] = -1'
 
               if (processingThreadsState[i] == -1 && !processingUnits[i].isRunning()) {
-                processingUnits[i].start();
+                executorService.submit(processingUnits[i]);
               }
               try {
-                processingUnits[i].join();
+                processingUnitResults[i].get();
                 if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                   UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                           this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
@@ -2485,9 +2488,11 @@ public class CPMEngine extends Thread {
           notifyListenersWithException(e);
         }
         try {
-          casConsumerPU.join();
+          casConsumerPUResult.get();
         } catch (InterruptedException e) {
 
+        } catch (ExecutionException e) {
+          e.printStackTrace();
         }
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
@@ -2496,12 +2501,7 @@ public class CPMEngine extends Thread {
         }
       }
 
-      // Fix for memory leak. CPMThreadGroup must be
-      // destroyed, but not until AFTER all threads that it
-      // owns, including this one, have ended. - Adam
-      final ThreadGroup group = getThreadGroup();
-      Thread threadGroupDestroyer = new ThreadGroupDestroyer(group);
-      threadGroupDestroyer.start();
+      executorService.shutdown();
     }
   }
 
@@ -2593,6 +2593,7 @@ public class CPMEngine extends Thread {
         producer.cleanup();
       }
       producer = null;
+      producerResult = null;
 
       if (consumerDeployList != null) {
         consumerDeployList.clear();
@@ -2625,6 +2626,7 @@ public class CPMEngine extends Thread {
       consumers = null;
 
       processingUnits = null;
+      processingUnitResults = null;
       casprocessorList = null;
       // this.enProcSt = null;
       stats = null;
@@ -3580,74 +3582,74 @@ public class CPMEngine extends Thread {
     }
   }
 
-  private static class ThreadGroupDestroyer extends Thread {
-    private Set<Thread> foreignThreadsBlockingShutdown = new LinkedHashSet<>();
-
-    private final ThreadGroup group;
-
-    private final AtomicInteger count = new AtomicInteger();
-
-    public ThreadGroupDestroyer(ThreadGroup aGroup) {
-      super(aGroup.getParent(), "threadGroupDestroyer");
-
-      group = aGroup;
-
-    }
-
-    @Override
-    public void run() {
-
-      while (group.activeCount() > 0) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-        }
-        Thread[] threads = new Thread[group.activeCount()];
-        group.enumerate(threads);
-        showThreads(threads, foreignThreadsBlockingShutdown, count);
-      }
-
-      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
-                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-                "UIMA_CPM_destroy_thread_group__FINEST",
-                new Object[] { Thread.currentThread().getName() });
-      }
-
-      group.destroy();
-    }
-
-    private void showThreads(Thread[] aThreadList, Set<Thread> foreignThreadsBlockingShutdown,
-            AtomicInteger count) {
-      count.incrementAndGet();
-
-      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
-        for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
-          if (aThreadList[i] != null) {
-            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
-                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_thread__FINEST",
-                    new Object[] { Thread.currentThread().getName(), String.valueOf(i),
-                        aThreadList[i].getName() });
-          }
-        }
-      }
-
-      if (count.get() % 5 == 0) {
-        for (Thread thread : aThreadList) {
-          Set<Thread> nonUimaThreads = new HashSet<>();
-          if (!(thread instanceof ProcessingUnit) && !(thread instanceof ArtifactProducer)
-                  && !foreignThreadsBlockingShutdown.contains(thread)) {
-            nonUimaThreads.add(thread);
-          }
-
-          if (nonUimaThreads.size() == aThreadList.length) {
-            UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
-                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unknown_thread__WARNING",
-                    new Object[] { thread.getName() });
-            foreignThreadsBlockingShutdown.add(thread);
-          }
-        }
-      }
-    }
-  }
+  // private static class ThreadGroupDestroyer extends Thread {
+  // private Set<Thread> foreignThreadsBlockingShutdown = new LinkedHashSet<>();
+  //
+  // private final ThreadGroup group;
+  //
+  // private final AtomicInteger count = new AtomicInteger();
+  //
+  // public ThreadGroupDestroyer(ThreadGroup aGroup) {
+  // super(aGroup.getParent(), "threadGroupDestroyer");
+  //
+  // group = aGroup;
+  //
+  // }
+  //
+  // @Override
+  // public void run() {
+  //
+  // while (group.activeCount() > 0) {
+  // try {
+  // Thread.sleep(100);
+  // } catch (InterruptedException e) {
+  // }
+  // Thread[] threads = new Thread[group.activeCount()];
+  // group.enumerate(threads);
+  // showThreads(threads, foreignThreadsBlockingShutdown, count);
+  // }
+  //
+  // if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+  // UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
+  // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
+  // "UIMA_CPM_destroy_thread_group__FINEST",
+  // new Object[] { Thread.currentThread().getName() });
+  // }
+  //
+  // group.destroy();
+  // }
+  //
+  // private void showThreads(Thread[] aThreadList, Set<Thread> foreignThreadsBlockingShutdown,
+  // AtomicInteger count) {
+  // count.incrementAndGet();
+  //
+  // if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+  // for (int i = 0; aThreadList != null && i < aThreadList.length; i++) {
+  // if (aThreadList[i] != null) {
+  // UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
+  // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_thread__FINEST",
+  // new Object[] { Thread.currentThread().getName(), String.valueOf(i),
+  // aThreadList[i].getName() });
+  // }
+  // }
+  // }
+  //
+  // if (count.get() % 5 == 0) {
+  // for (Thread thread : aThreadList) {
+  // Set<Thread> nonUimaThreads = new HashSet<>();
+  // if (!(thread instanceof ProcessingUnit) && !(thread instanceof ArtifactProducer)
+  // && !foreignThreadsBlockingShutdown.contains(thread)) {
+  // nonUimaThreads.add(thread);
+  // }
+  //
+  // if (nonUimaThreads.size() == aThreadList.length) {
+  // UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
+  // "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unknown_thread__WARNING",
+  // new Object[] { thread.getName() });
+  // foreignThreadsBlockingShutdown.add(thread);
+  // }
+  // }
+  // }
+  // }
+  // }
 }
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
similarity index 55%
rename from uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java
rename to uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
index c01f7df..f676145 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMThreadGroup.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMExecutorService.java
@@ -19,105 +19,104 @@
 
 package org.apache.uima.collection.impl.cpm.engine;
 
-import java.util.ArrayList;
+import static org.apache.uima.UIMAFramework.getLogger;
+import static org.apache.uima.collection.impl.cpm.utils.CPMUtils.CPM_LOG_RESOURCE_BUNDLE;
+import static org.apache.uima.util.Level.FINER;
+import static org.apache.uima.util.Level.SEVERE;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.uima.UIMAFramework;
 import org.apache.uima.collection.StatusCallbackListener;
 import org.apache.uima.collection.base_cpm.BaseStatusCallbackListener;
 import org.apache.uima.collection.impl.EntityProcessStatusImpl;
-import org.apache.uima.collection.impl.cpm.utils.CPMUtils;
-import org.apache.uima.util.Level;
 import org.apache.uima.util.ProcessTrace;
 
-
 /**
  * This component catches uncaught errors in the CPM. All critical threads in the CPM are part of
  * this ThreadGroup. If OutOfMemory Error is thrown this component is notified by the JVM and its
  * job is to notify registered listeners.
- * 
  */
-public class CPMThreadGroup extends ThreadGroup {
-  
-  /** The callback listeners. */
-  private ArrayList callbackListeners = null;
+public class CPMExecutorService extends ThreadPoolExecutor {
 
-  /** The proc tr. */
-  private ProcessTrace procTr = null;
+  private List<BaseStatusCallbackListener> callbackListeners = null;
 
-  /**
-   * Instantiates a new CPM thread group.
-   *
-   * @param name the name
-   */
-  public CPMThreadGroup(String name) {
-    super(name);
-  }
+  private ProcessTrace procTr = null;
 
-  /**
-   * Instantiates a new CPM thread group.
-   *
-   * @param parent -
-   *          parent thread group
-   * @param name -
-   *          name of this thread group
-   */
-  public CPMThreadGroup(ThreadGroup parent, String name) {
-    super(parent, name);
+  public CPMExecutorService() {
+    super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
   }
 
   /**
    * Sets listeners to be used in notifications.
    *
-   * @param aListenerList -
-   *          list of registered listners
+   * @param aListenerList
+   *          list of registered listeners
    */
-  public void setListeners(ArrayList aListenerList) {
+  public void setListeners(List<BaseStatusCallbackListener> aListenerList) {
     callbackListeners = aListenerList;
   }
 
   /**
    * Sets the process trace.
    *
-   * @param aProcessTrace the new process trace
+   * @param aProcessTrace
+   *          the new process trace
    */
   public void setProcessTrace(ProcessTrace aProcessTrace) {
     procTr = aProcessTrace;
   }
 
-  /* (non-Javadoc)
-   * @see java.lang.ThreadGroup#uncaughtException(java.lang.Thread, java.lang.Throwable)
-   */
   @Override
-  public void uncaughtException(Thread t, Throwable e) {
-    if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
-      UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
-              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unhandled_error__SEVERE",
-              new Object[] { Thread.currentThread().getName(), e.getClass().getName() });
+  protected void afterExecute(Runnable aThread, Throwable aThrowable) {
+    Throwable throwable = aThrowable;
+    if (throwable == null && aThread instanceof FutureTask) {
+      try {
+        ((FutureTask<?>) aThread).get();
+      } catch (InterruptedException e) {
+        // Ignore
+      } catch (ExecutionException e) {
+        throwable = e.getCause();
+      }
+    }
+
+    if (throwable == null) {
+      return;
+    }
+
+    if (getLogger().isLoggable(SEVERE)) {
+      getLogger(this.getClass()).logrb(SEVERE, this.getClass().getName(), "process",
+              CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_unhandled_error__SEVERE",
+              new Object[] { Thread.currentThread().getName(), throwable.getClass().getName() });
 
     }
     try {
       // Notify listeners
       for (int i = 0; callbackListeners != null && i < callbackListeners.size(); i++) {
-        // System.out.println("ThreadGroup.uncaughtException()-Got Error - Notifying Listener");
-        notifyListener((BaseStatusCallbackListener) callbackListeners.get(i), e);
+        notifyListener((BaseStatusCallbackListener) callbackListeners.get(i), throwable);
       }
 
     } catch (Throwable tr) {
-      if (UIMAFramework.getLogger().isLoggable(Level.FINER)) {
-        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
-                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
+      if (getLogger().isLoggable(FINER)) {
+        getLogger(this.getClass()).logrb(FINER, this.getClass().getName(), "process",
+                CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
                 new Object[] { Thread.currentThread().getName(), tr.getClass().getName() });
         tr.printStackTrace();
       }
     }
-    // System.out.println("ThreadGroup.uncaughtException()-Done Handling Error");
   }
 
   /**
    * Notify listener.
    *
-   * @param aStatCL the a stat CL
-   * @param e the e
+   * @param aStatCL
+   *          the a stat CL
+   * @param e
+   *          the e
    */
   private void notifyListener(BaseStatusCallbackListener aStatCL, Throwable e) {
     EntityProcessStatusImpl enProcSt = new EntityProcessStatusImpl(procTr);
diff --git a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
index 806fe99..b380675 100644
--- a/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
+++ b/uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/DebugControlThread.java
@@ -29,12 +29,11 @@ import org.apache.uima.collection.impl.cpm.utils.CpmLocalizedMessage;
 import org.apache.uima.util.FileUtils;
 import org.apache.uima.util.Level;
 
-
 /**
  * The Class DebugControlThread.
  */
 public class DebugControlThread implements Runnable {
-  
+
   /** The Constant NOTFOUND. */
   private final static String NOTFOUND = "NOT-FOUND";
 
@@ -61,9 +60,12 @@ public class DebugControlThread implements Runnable {
   /**
    * Instantiates a new debug control thread.
    *
-   * @param aCpm the a cpm
-   * @param aFilename the a filename
-   * @param aCheckpointFrequency the a checkpoint frequency
+   * @param aCpm
+   *          the a cpm
+   * @param aFilename
+   *          the a filename
+   * @param aCheckpointFrequency
+   *          the a checkpoint frequency
    */
   public DebugControlThread(CPMEngine aCpm, String aFilename, int aCheckpointFrequency) {
     cpm = aCpm;
@@ -74,7 +76,8 @@ public class DebugControlThread implements Runnable {
   /**
    * Start.
    *
-   * @throws RuntimeException the runtime exception
+   * @throws RuntimeException
+   *           the runtime exception
    */
   public void start() throws RuntimeException {
     if (fileName == null) {
@@ -84,10 +87,10 @@ public class DebugControlThread implements Runnable {
                 "UIMA_CPM_checkpoint_target_not_defined__FINEST",
                 new Object[] { Thread.currentThread().getName() });
       }
-      throw new RuntimeException(CpmLocalizedMessage.getLocalizedMessage(
-              CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
-              "UIMA_CPM_EXP_target_checkpoint_not_defined__WARNING", new Object[] { Thread
-                      .currentThread().getName() }));
+      throw new RuntimeException(
+              CpmLocalizedMessage.getLocalizedMessage(CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
+                      "UIMA_CPM_EXP_target_checkpoint_not_defined__WARNING",
+                      new Object[] { Thread.currentThread().getName() }));
     }
     if (cpm == null) {
       if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
@@ -100,7 +103,7 @@ public class DebugControlThread implements Runnable {
               CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_EXP_invalid_cpm__WARNING",
               new Object[] { Thread.currentThread().getName() }));
     } else {
-      new Thread(this).start();
+      cpm.getExecutorService().submit(this);
     }
   }
 
@@ -113,7 +116,9 @@ public class DebugControlThread implements Runnable {
     doCheckpoint();
   }
 
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see java.lang.Runnable#run()
    */
   @Override
@@ -147,7 +152,8 @@ public class DebugControlThread implements Runnable {
   /**
    * Interpret and execute command.
    *
-   * @param aCommand the a command
+   * @param aCommand
+   *          the a command
    */
   private void interpretAndExecuteCommand(String aCommand) {
     if (aCommand == null) {