You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:27:05 UTC
svn commit: r810567 [2/3] - in
/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms:
./ client/ message/ service/
Modified: incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=810567&r1=810566&r2=810567&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Wed Sep 2 15:27:04 2009
@@ -83,1537 +83,1577 @@
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.delegate.Delegate.DelegateEntry;
-public abstract class BaseUIMAAsynchronousEngineCommon_impl
-implements UimaAsynchronousEngine, MessageListener
-{
- private static final Class CLASS_NAME = BaseUIMAAsynchronousEngineCommon_impl.class;
- protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
- protected static final int MetadataTimeout = 1;
+public abstract class BaseUIMAAsynchronousEngineCommon_impl implements UimaAsynchronousEngine,
+ MessageListener {
+ private static final Class CLASS_NAME = BaseUIMAAsynchronousEngineCommon_impl.class;
- protected static final int CpCTimeout = 2;
+ protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
- protected static final int ProcessTimeout = 3;
-
- protected static final int PingTimeout = 4;
+ protected static final int MetadataTimeout = 1;
- protected volatile boolean initialized;
+ protected static final int CpCTimeout = 2;
- protected List listeners = new ArrayList();
+ protected static final int ProcessTimeout = 3;
- protected AsynchAECasManager asynchManager;
+ protected static final int PingTimeout = 4;
- protected boolean remoteService = false;
+ protected volatile boolean initialized;
- protected CollectionReader collectionReader = null;
+ protected List listeners = new ArrayList();
- protected volatile boolean running = false;
+ protected AsynchAECasManager asynchManager;
- protected ProcessingResourceMetaData resourceMetadata;
+ protected boolean remoteService = false;
- protected CAS sendAndReceiveCAS = null;
+ protected CollectionReader collectionReader = null;
- protected UIDGenerator idGenerator = new UIDGenerator();
+ protected volatile boolean running = false;
- protected ConcurrentHashMap<String,ClientRequest> clientCache =
- new ConcurrentHashMap<String,ClientRequest>();
-
- protected ConcurrentHashMap<Long, ThreadMonitor> threadMonitorMap =
- new ConcurrentHashMap<Long, ThreadMonitor>();
+ protected ProcessingResourceMetaData resourceMetadata;
- // Default timeout for ProcessCas requests
- protected int processTimeout = 0;
+ protected CAS sendAndReceiveCAS = null;
- // Default timeout for GetMeta requests
- protected int metadataTimeout = 60000;
-
- // Default timeout for CpC requests is no timeout
- protected int cpcTimeout = 0;
+ protected UIDGenerator idGenerator = new UIDGenerator();
- protected volatile boolean abort = false;
+ protected ConcurrentHashMap<String, ClientRequest> clientCache = new ConcurrentHashMap<String, ClientRequest>();
- protected static final String uniqueIdentifier = String.valueOf(System.nanoTime());
+ protected ConcurrentHashMap<Long, ThreadMonitor> threadMonitorMap = new ConcurrentHashMap<Long, ThreadMonitor>();
- protected Exception exc;
+ // Default timeout for ProcessCas requests
+ protected int processTimeout = 0;
- // Counter maintaining a number of CASes sent to a service. The counter
- // is incremented every time a CAS is sent and decremented when the CAS
- // reply is received. It is also adjusted down in case of a timeout or
- // error.
- protected AtomicLong outstandingCasRequests = new AtomicLong();
+ // Default timeout for GetMeta requests
+ protected int metadataTimeout = 60000;
+
+ // Default timeout for CpC requests is no timeout
+ protected int cpcTimeout = 0;
+
+ protected volatile boolean abort = false;
+
+ protected static final String uniqueIdentifier = String.valueOf(System.nanoTime());
+
+ protected Exception exc;
+
+ // Counter maintaining a number of CASes sent to a service. The counter
+ // is incremented every time a CAS is sent and decremented when the CAS
+ // reply is received. It is also adjusted down in case of a timeout or
+ // error.
+ protected AtomicLong outstandingCasRequests = new AtomicLong();
protected AtomicLong totalCasRequestsSentBetweenCpCs = new AtomicLong();
protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
- protected MessageConsumer consumer = null;
+ protected MessageConsumer consumer = null;
+
+ protected String serializationStrategy = "xmi";
+
+ protected UimaASClientInfoMBean clientSideJmxStats = new UimaASClientInfo();
- protected String serializationStrategy = "xmi";
-
- protected UimaASClientInfoMBean clientSideJmxStats =
- new UimaASClientInfo();
-
private UimaSerializer uimaSerializer = new UimaSerializer();
protected ClientServiceDelegate serviceDelegate = null;
-
+
private Object stopMux = new Object();
-
+
private Object sendMux = new Object();
-
- private BlockingQueue<CasQueueEntry> threadQueue =
- new LinkedBlockingQueue<CasQueueEntry>();
-
- private ConcurrentHashMap< Long, CasQueueEntry> threadRegistrar =
- new ConcurrentHashMap<Long, CasQueueEntry>();
-
- private volatile boolean casQueueProducerReady;
-
+
+ private BlockingQueue<CasQueueEntry> threadQueue = new LinkedBlockingQueue<CasQueueEntry>();
+
+ private ConcurrentHashMap<Long, CasQueueEntry> threadRegistrar = new ConcurrentHashMap<Long, CasQueueEntry>();
+
+ private volatile boolean casQueueProducerReady;
+
private Object casProducerMux = new Object();
-
- protected BlockingQueue<PendingMessage> pendingMessageQueue =
- new LinkedBlockingQueue<PendingMessage>();
-
- // Create Semaphore that will signal when the producer object is initialized
- protected Semaphore producerSemaphore = new Semaphore(1);
- // Create Semaphore that will signal when CPC reply has been received
+
+ protected BlockingQueue<PendingMessage> pendingMessageQueue = new LinkedBlockingQueue<PendingMessage>();
+
+ // Create Semaphore that will signal when the producer object is initialized
+ protected Semaphore producerSemaphore = new Semaphore(1);
+
+ // Create Semaphore that will signal when CPC reply has been received
protected Semaphore cpcSemaphore = new Semaphore(1);
- // Create Semaphore that will signal when GetMeta reply has been received
+
+ // Create Semaphore that will signal when GetMeta reply has been received
protected Semaphore getMetaSemaphore = new Semaphore(1);
- // Signals when the client is ready to send CPC request
+
+ // Signals when the client is ready to send CPC request
protected Semaphore cpcReadySemaphore = new Semaphore(1);
+
// Signals receipt of a CPC reply
- protected Semaphore cpcReplySemaphore =
- new Semaphore(1);
-
- protected volatile boolean producerInitialized;
-
- abstract public String getEndPointName() throws Exception;
+ protected Semaphore cpcReplySemaphore = new Semaphore(1);
+
+ protected volatile boolean producerInitialized;
+
+ abstract public String getEndPointName() throws Exception;
+
abstract protected TextMessage createTextMessage() throws Exception;
+
abstract protected BytesMessage createBytesMessage() throws Exception;
- abstract protected void setMetaRequestMessage(Message msg) throws Exception;
- abstract protected void setCASMessage(String casReferenceId, CAS aCAS,Message msg) throws Exception;
- abstract protected void setCASMessage(String casReferenceId, String aSerializedCAS, Message msg) throws Exception;
- abstract protected void setCASMessage(String casReferenceId, byte[] aSerializedCAS, Message msg) throws Exception;
- abstract public void setCPCMessage(Message msg) throws Exception;
- abstract public void initialize(Map anApplicationContext) throws ResourceInitializationException;
- abstract protected void cleanup() throws Exception;
- abstract public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext) throws Exception;
- abstract protected String deploySpringContainer(String[] springContextFiles) throws ResourceInitializationException;
-
- public void addStatusCallbackListener(UimaAsBaseCallbackListener aListener)
- {
- listeners.add(aListener);
- }
-
- public String getSerializationStrategy()
- {
- return serializationStrategy;
- }
-
+
+ abstract protected void setMetaRequestMessage(Message msg) throws Exception;
+
+ abstract protected void setCASMessage(String casReferenceId, CAS aCAS, Message msg)
+ throws Exception;
+
+ abstract protected void setCASMessage(String casReferenceId, String aSerializedCAS, Message msg)
+ throws Exception;
+
+ abstract protected void setCASMessage(String casReferenceId, byte[] aSerializedCAS, Message msg)
+ throws Exception;
+
+ abstract public void setCPCMessage(Message msg) throws Exception;
+
+ abstract public void initialize(Map anApplicationContext) throws ResourceInitializationException;
+
+ abstract protected void cleanup() throws Exception;
+
+ abstract public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
+ throws Exception;
+
+ abstract protected String deploySpringContainer(String[] springContextFiles)
+ throws ResourceInitializationException;
+
+ public void addStatusCallbackListener(UimaAsBaseCallbackListener aListener) {
+ listeners.add(aListener);
+ }
+
+ public String getSerializationStrategy() {
+ return serializationStrategy;
+ }
+
protected void setSerializationStrategy(String aSerializationStrategy) {
serializationStrategy = aSerializationStrategy;
}
- /**
- * Serializes a given CAS.
- *
- * @param aCAS - CAS to serialize
- * @return - serialized CAS
- *
- * @throws Exception
- */
- protected String serializeCAS(CAS aCAS, XmiSerializationSharedData serSharedData) throws Exception
- {
- return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
- }
-
- protected String serializeCAS(CAS aCAS) throws Exception
- {
- XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
- return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
- }
-
- public void removeStatusCallbackListener(UimaAsBaseCallbackListener aListener)
- {
- listeners.remove(aListener);
- }
- public void onBeforeMessageSend(UimaASProcessStatus status) {
- for (int i = 0; listeners != null && i < listeners.size(); i++)
- {
- UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
- statCL.onBeforeMessageSend(status);
- }
- }
- public synchronized void setCollectionReader(CollectionReader aCollectionReader) throws ResourceInitializationException
- {
- if ( initialized )
- {
- // Uima ee client has already been initialized. CR should be
- // set before calling initialize()
- throw new ResourceInitializationException();
- }
- collectionReader = aCollectionReader;
- }
-
- private void addMessage(PendingMessage msg ) {
+
+ /**
+ * Serializes a given CAS.
+ *
+ * @param aCAS
+ * - CAS to serialize
+ * @return - serialized CAS
+ *
+ * @throws Exception
+ */
+ protected String serializeCAS(CAS aCAS, XmiSerializationSharedData serSharedData)
+ throws Exception {
+ return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
+ }
+
+ protected String serializeCAS(CAS aCAS) throws Exception {
+ XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
+ return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
+ }
+
+ public void removeStatusCallbackListener(UimaAsBaseCallbackListener aListener) {
+ listeners.remove(aListener);
+ }
+
+ public void onBeforeMessageSend(UimaASProcessStatus status) {
+ for (int i = 0; listeners != null && i < listeners.size(); i++) {
+ UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
+ statCL.onBeforeMessageSend(status);
+ }
+ }
+
+ public synchronized void setCollectionReader(CollectionReader aCollectionReader)
+ throws ResourceInitializationException {
+ if (initialized) {
+ // Uima ee client has already been initialized. CR should be
+ // set before calling initialize()
+ throw new ResourceInitializationException();
+ }
+ collectionReader = aCollectionReader;
+ }
+
+ private void addMessage(PendingMessage msg) {
pendingMessageQueue.add(msg);
- }
+ }
- protected void acquireCpcReadySemaphore() {
- try {
- // Acquire cpcReady semaphore to block sending CPC request until
- // ALL outstanding CASes are received.
- cpcReadySemaphore.acquire();
- } catch( InterruptedException e) {
- System.out.println("UIMA AS Client Interrupted While Attempting To Acquire cpcReadySemaphore in initialize()");
- }
- }
- public synchronized void collectionProcessingComplete() throws ResourceProcessException
- {
- try
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_app_cpc_request_FINEST", new Object[] {});
- }
- if ( outstandingCasRequests.get() > 0 ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_in_cpc_INFO",
- new Object[] { outstandingCasRequests.get(), totalCasRequestsSentBetweenCpCs.get() });
- }
- // If the client was initialized but never sent any CASes its cpcReadySemaphore
- // must be first explicitly released to enable the code to send CPC to a service.
- // The semaphore is initially acquired in the initialize(Map) method and typically
- // released when the number of CASes sent equals the number of CASes received. Since
- // no CASes were sent we must do the release here to be able to continue.
- if ( totalCasRequestsSentBetweenCpCs.get() == 0 ) {
- cpcReadySemaphore.release();
- }
- // The cpcReadySemaphore was initially acquired in the initialize() method
- // so below we wait until ALL CASes are processed. Once all
- // CASes are received the semaphore will be released
- acquireCpcReadySemaphore();
-
- if (!running)
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cpc_request_not_done_INFO", new Object[] {});
- }
- return;
- }
-
- ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); //, timeout);
- requestToCache.setIsRemote(remoteService);
- requestToCache.setCPCRequest(true);
- requestToCache.setCpcTimeout(cpcTimeout);
- requestToCache.setEndpoint(getEndPointName());
-
- clientCache.put(uniqueIdentifier, requestToCache);
-
- PendingMessage msg = new PendingMessage(AsynchAEMessage.CollectionProcessComplete);
- if (cpcTimeout > 0)
- {
- requestToCache.startTimer();
- msg.put(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
- }
+ protected void acquireCpcReadySemaphore() {
+ try {
+ // Acquire cpcReady semaphore to block sending CPC request until
+ // ALL outstanding CASes are received.
+ cpcReadySemaphore.acquire();
+ } catch (InterruptedException e) {
+ System.out
+ .println("UIMA AS Client Interrupted While Attempting To Acquire cpcReadySemaphore in initialize()");
+ }
+ }
+
+ public synchronized void collectionProcessingComplete() throws ResourceProcessException {
+ try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_started_cpc_request_timer_FINEST", new Object[] {});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_app_cpc_request_FINEST", new Object[] {});
+ }
+ if (outstandingCasRequests.get() > 0) {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "collectionProcessingComplete",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_in_cpc_INFO",
+ new Object[] { outstandingCasRequests.get(),
+ totalCasRequestsSentBetweenCpCs.get() });
+ }
+ // If the client was initialized but never sent any CASes its cpcReadySemaphore
+ // must be first explicitly released to enable the code to send CPC to a service.
+ // The semaphore is initially acquired in the initialize(Map) method and typically
+ // released when the number of CASes sent equals the number of CASes received. Since
+ // no CASes were sent we must do the release here to be able to continue.
+ if (totalCasRequestsSentBetweenCpCs.get() == 0) {
+ cpcReadySemaphore.release();
+ }
+ // The cpcReadySemaphore was initially acquired in the initialize() method
+ // so below we wait until ALL CASes are processed. Once all
+ // CASes are received the semaphore will be released
+ acquireCpcReadySemaphore();
+
+ if (!running) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cpc_request_not_done_INFO", new Object[] {});
+ }
+ return;
+ }
+
+ ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); // , timeout);
+ requestToCache.setIsRemote(remoteService);
+ requestToCache.setCPCRequest(true);
+ requestToCache.setCpcTimeout(cpcTimeout);
+ requestToCache.setEndpoint(getEndPointName());
+
+ clientCache.put(uniqueIdentifier, requestToCache);
+
+ PendingMessage msg = new PendingMessage(AsynchAEMessage.CollectionProcessComplete);
+ if (cpcTimeout > 0) {
+ requestToCache.startTimer();
+ msg.put(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
}
- // Add CPC message to the pending queue
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_started_cpc_request_timer_FINEST", new Object[] {});
+ }
+ // Add CPC message to the pending queue
addMessage(msg);
- // Acquire cpc semaphore. When a CPC reply comes or there is a timeout or the client
- // is stopped, the semaphore will be released.
+ // Acquire cpc semaphore. When a CPC reply comes or there is a timeout or the client
+ // is stopped, the semaphore will be released.
try {
cpcReplySemaphore.acquire();
- } catch( InterruptedException ex) {
+ } catch (InterruptedException ex) {
System.out.println("UIMA AS Cllient Interrupted While Acquiring cpcReplySemaphore");
- }
- // Wait for CPC Reply. This blocks on the cpcReplySemaphore
- waitForCpcReply();
- totalCasRequestsSentBetweenCpCs.set(0); // reset number of CASes sent to a service
- cancelTimer(uniqueIdentifier);
+ }
+ // Wait for CPC Reply. This blocks on the cpcReplySemaphore
+ waitForCpcReply();
+ totalCasRequestsSentBetweenCpCs.set(0); // reset number of CASes sent to a service
+ cancelTimer(uniqueIdentifier);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cancelled_cpc_request_timer_FINEST", new Object[] {});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "collectionProcessingComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cancelled_cpc_request_timer_FINEST", new Object[] {});
+ }
+ if (running) {
+ for (int i = 0; listeners != null && i < listeners.size(); i++) {
+ ((UimaASStatusCallbackListener) listeners.get(i)).collectionProcessComplete(null);
+ }
+ }
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
+ }
+ }
+
+ private void releaseCacheEntries() {
+ Iterator it = clientCache.keySet().iterator();
+ while (it.hasNext()) {
+ ClientRequest entry = clientCache.get((String) it.next());
+ if (entry != null && entry.getCAS() != null) {
+ entry.getCAS().release();
}
- if (running)
- {
- for (int i = 0; listeners != null && i < listeners.size(); i++)
- {
- ((UimaASStatusCallbackListener) listeners.get(i)).collectionProcessComplete(null);
- }
- }
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
- }
- private void releaseCacheEntries() {
- Iterator it = clientCache.keySet().iterator();
- while( it.hasNext() ) {
- ClientRequest entry = clientCache.get((String)it.next());
- if ( entry != null && entry.getCAS() != null ) {
- entry.getCAS().release();
- }
- }
- }
-
- private void clearThreadRegistrar() {
- Iterator it = threadRegistrar.keySet().iterator();
- while( it.hasNext() ) {
- Long key = (Long)it.next();
+ }
+ }
+
+ private void clearThreadRegistrar() {
+ Iterator it = threadRegistrar.keySet().iterator();
+ while (it.hasNext()) {
+ Long key = (Long) it.next();
CasQueueEntry entry = threadRegistrar.get(key);
- if ( entry != null ) {
+ if (entry != null) {
entry.getSemaphore().release();
}
- }
- }
-
-
- public void stop()
- {
- synchronized( stopMux ) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_as_client_INFO", new Object[] {});
- }
- if (!running)
- {
- return;
- }
-
- running = false;
- casQueueProducerReady = false;
- if ( serviceDelegate != null ) {
+ }
+ }
+
+ public void stop() {
+ synchronized (stopMux) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_as_client_INFO",
+ new Object[] {});
+ }
+ if (!running) {
+ return;
+ }
+
+ running = false;
+ casQueueProducerReady = false;
+ if (serviceDelegate != null) {
serviceDelegate.cancelDelegateTimer();
}
- try
- {
- try {
- clearThreadRegistrar();
- releaseCacheEntries();
- } catch( Exception ex) {
- ex.printStackTrace();
- }
-
- // Unblock threads
- if( threadMonitorMap.size() > 0 )
- {
- Iterator it = threadMonitorMap.keySet().iterator();
- while( it.hasNext() )
- {
- long key = ((Long)it.next()).longValue();
- ThreadMonitor threadMonitor =
- (ThreadMonitor)threadMonitorMap.get(key);
- if ( threadMonitor == null || threadMonitor.getMonitor() == null)
- {
- continue;
- }
- threadMonitor.getMonitor().release();
- }
- }
- cpcReadySemaphore.release();
+ try {
+ try {
+ clearThreadRegistrar();
+ releaseCacheEntries();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ // Unblock threads
+ if (threadMonitorMap.size() > 0) {
+ Iterator it = threadMonitorMap.keySet().iterator();
+ while (it.hasNext()) {
+ long key = ((Long) it.next()).longValue();
+ ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(key);
+ if (threadMonitor == null || threadMonitor.getMonitor() == null) {
+ continue;
+ }
+ threadMonitor.getMonitor().release();
+ }
+ }
+ cpcReadySemaphore.release();
outstandingCasRequests.set(0); // reset global counter of outstanding requests
-
+
cpcReplySemaphore.release();
getMetaSemaphore.release();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO", new Object[] {});
- }
- for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();)
- {
- Map.Entry entry = (Map.Entry) i.next();
- Object key = entry.getKey();
- undeploy((String) key);
- }
- asynchManager = null;
- springContainerRegistry.clear();
- listeners.clear();
- clientCache.clear();
- threadQueue.clear();
- // Add empty CasQueueEntry object to the queue so that we wake up a reader thread which
- // may be sitting in the threadQueue.take() method. The reader will
- // first check the state of the 'running' flag and find it false which
- // will cause the reader to exit run() method
- threadQueue.add(new CasQueueEntry(new Semaphore(1)));
- threadRegistrar.clear();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- /**
- * This method spins a thread where CASes are distributed to requesting
- * threads in an orderly fashion. CASes are distributed among the
- * threads based on FIFO. Oldest waiting thread receive the CAS first.
- *
- */
- private void serveCASes() {
- synchronized( casProducerMux ) {
- if ( casQueueProducerReady ) {
- return; // Only one CAS producer thread is needed/allowed
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO",
+ new Object[] {});
+ }
+ for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();) {
+ Map.Entry entry = (Map.Entry) i.next();
+ Object key = entry.getKey();
+ undeploy((String) key);
+ }
+ asynchManager = null;
+ springContainerRegistry.clear();
+ listeners.clear();
+ clientCache.clear();
+ threadQueue.clear();
+ // Add empty CasQueueEntry object to the queue so that we wake up a reader thread which
+ // may be sitting in the threadQueue.take() method. The reader will
+ // first check the state of the 'running' flag and find it false which
+ // will cause the reader to exit run() method
+ threadQueue.add(new CasQueueEntry(new Semaphore(1)));
+ threadRegistrar.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * This method spins a thread where CASes are distributed to requesting threads in an orderly
+ * fashion. CASes are distributed among the threads based on FIFO. Oldest waiting thread receive
+ * the CAS first.
+ *
+ */
+ private void serveCASes() {
+ synchronized (casProducerMux) {
+ if (casQueueProducerReady) {
+ return; // Only one CAS producer thread is needed/allowed
}
casQueueProducerReady = true;
}
- // Spin a CAS producer thread
+ // Spin a CAS producer thread
new Thread() {
- public void run()
- {
- // terminate when the client API is stopped
- while( running ) {
- try
- {
- // Remove the oldest CAS request from the queue.
- // Every thread requesting a CAS adds an entry to this
- // queue.
+ public void run() {
+ // terminate when the client API is stopped
+ while (running) {
+ try {
+ // Remove the oldest CAS request from the queue.
+ // Every thread requesting a CAS adds an entry to this
+ // queue.
CasQueueEntry entry = threadQueue.take();
- if ( !running ) {
- return; // client API has been stopped
+ if (!running) {
+ return; // client API has been stopped
}
CAS cas = null;
long startTime = System.nanoTime();
- // Wait for a free CAS instance
- if (remoteService)
- {
+ // Wait for a free CAS instance
+ if (remoteService) {
cas = asynchManager.getNewCas("ApplicationCasPoolContext");
- }
- else
- {
+ } else {
cas = asynchManager.getNewCas();
}
long waitingTime = System.nanoTime() - startTime;
- clientSideJmxStats.incrementTotalTimeWaitingForCas( waitingTime );
+ clientSideJmxStats.incrementTotalTimeWaitingForCas(waitingTime);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_new_cas_FINEST",
- new Object[] {"Time Waiting for CAS", (double)waitingTime / (double)1000000});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "getCAS",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_new_cas_FINEST",
+ new Object[] { "Time Waiting for CAS",
+ (double) waitingTime / (double) 1000000 });
}
- if ( running ) { // only if the client is still running handle the new cas
+ if (running) { // only if the client is still running handle the new cas
try {
- // Associate the CAS with the entry and release the semaphore
+ // Associate the CAS with the entry and release the semaphore
entry.setCas(cas);
} finally {
- entry.getSemaphore().release();
+ entry.getSemaphore().release();
}
} else {
return; // Client is terminating
}
+ } catch (Exception e) {
+ e.printStackTrace();
}
- catch( Exception e) { e.printStackTrace();}
}
}
- }.start();
- }
+ }.start();
+ }
- /**
- * Returns a CAS. If multiple threads call this method, the order of each
- * request is preserved. The oldest waiting thread receives the CAS. Each
- * request for a CAS is queued, and when the CAS becomes available the
- * oldest waiting thread will receive it for processing.
- */
- public CAS getCAS() throws Exception
- {
+ /**
+ * Returns a CAS. If multiple threads call this method, the order of each request is preserved.
+ * The oldest waiting thread receives the CAS. Each request for a CAS is queued, and when the CAS
+ * becomes available the oldest waiting thread will receive it for processing.
+ */
+ public CAS getCAS() throws Exception {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_request_for_cas_FINEST", new Object[] {});
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_request_for_cas_FINEST",
+ new Object[] {});
}
- if ( !running ) {
+ if (!running) {
throw new RuntimeException("Uima AS Client Is Stopping");
}
- if (!initialized )
- {
- throw new ResourceInitializationException();
- }
- // Spin a thread that fetches CASes from the CAS Pool
- if ( !casQueueProducerReady ) {
+ if (!initialized) {
+ throw new ResourceInitializationException();
+ }
+ // Spin a thread that fetches CASes from the CAS Pool
+ if (!casQueueProducerReady) {
serveCASes(); // start CAS producer thread
}
- // Each thread has an entry in the map. The entry is created in the
- // map once and cached.
- CasQueueEntry entry = getQueueEntry( Thread.currentThread().getId());
- // Add this thread entry to the queue of threads waiting for a CAS
+ // Each thread has an entry in the map. The entry is created in the
+ // map once and cached.
+ CasQueueEntry entry = getQueueEntry(Thread.currentThread().getId());
+ // Add this thread entry to the queue of threads waiting for a CAS
threadQueue.add(entry);
- if ( entry != null ) {
+ if (entry != null) {
while (running) {
// Wait until the CAS producer thread adds a CAS to the CasQueueEntry and
- // releases the semaphore.
+ // releases the semaphore.
entry.getSemaphore().acquire();
if (entry.getCas() == null) {
- // Should not happen unless we are terminating
+ // Should not happen unless we are terminating
break;
} else {
return entry.getCas();
}
} // while
}
- return null; // client has terminated
- }
-
- private CasQueueEntry getQueueEntry(long aThreadId ) {
- CasQueueEntry entry = null;
- if ( threadRegistrar.containsKey(aThreadId ) ) {
- entry = threadRegistrar.get(aThreadId);
- } else {
- entry = new CasQueueEntry(new Semaphore(1));
- threadRegistrar.put(aThreadId, entry);
- }
- return entry;
- }
-
- protected void reset() {
- }
-
- private static class CasQueueEntry {
- private CAS cas;
- private Semaphore semaphore = new Semaphore(1);
+ return null; // client has terminated
+ }
+
+ private CasQueueEntry getQueueEntry(long aThreadId) {
+ CasQueueEntry entry = null;
+ if (threadRegistrar.containsKey(aThreadId)) {
+ entry = threadRegistrar.get(aThreadId);
+ } else {
+ entry = new CasQueueEntry(new Semaphore(1));
+ threadRegistrar.put(aThreadId, entry);
+ }
+ return entry;
+ }
+
+ protected void reset() {
+ }
+
+ private static class CasQueueEntry {
+ private CAS cas;
+
+ private Semaphore semaphore = new Semaphore(1);
+
public CAS getCas() {
return cas;
}
+
public void setCas(CAS cas) {
this.cas = cas;
}
- public CasQueueEntry( Semaphore aSharedSemaphore ) {
+
+ public CasQueueEntry(Semaphore aSharedSemaphore) {
semaphore = aSharedSemaphore;
}
+
public Semaphore getSemaphore() {
return semaphore;
}
-
- }
-
- protected void sendMetaRequest() throws Exception
- {
- PendingMessage msg = new PendingMessage(AsynchAEMessage.GetMeta);
- ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); //, metadataTimeout);
- requestToCache.setIsRemote(remoteService);
- requestToCache.setMetaRequest(true);
- requestToCache.setMetadataTimeout(metadataTimeout);
-
- requestToCache.setEndpoint(getEndPointName());
-
- clientCache.put(uniqueIdentifier, requestToCache);
- if (metadataTimeout > 0)
- {
- serviceDelegate.startGetMetaRequestTimer();
- msg.put(UimaAsynchronousEngine.GetMetaTimeout, String.valueOf(metadataTimeout));
- }
- // Add message to the pending queue
+
+ }
+
+ protected void sendMetaRequest() throws Exception {
+ PendingMessage msg = new PendingMessage(AsynchAEMessage.GetMeta);
+ ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); // , metadataTimeout);
+ requestToCache.setIsRemote(remoteService);
+ requestToCache.setMetaRequest(true);
+ requestToCache.setMetadataTimeout(metadataTimeout);
+
+ requestToCache.setEndpoint(getEndPointName());
+
+ clientCache.put(uniqueIdentifier, requestToCache);
+ if (metadataTimeout > 0) {
+ serviceDelegate.startGetMetaRequestTimer();
+ msg.put(UimaAsynchronousEngine.GetMetaTimeout, String.valueOf(metadataTimeout));
+ }
+ // Add message to the pending queue
addMessage(msg);
- }
- protected void waitForCpcReply()
- {
- try {
- // wait for CPC reply
- cpcReplySemaphore.acquire();
- } catch( InterruptedException e) {
-
- } finally {
- cpcReplySemaphore.release();
- }
- }
- /**
- * Blocks while trying to acquire a semaphore awaiting receipt of GetMeta Reply.
- * When the GetMeta is received, or there is a timeout, or the client stops the
- * semaphore will be released.
- */
- protected void waitForMetadataReply()
- {
- try {
+ }
+
+ protected void waitForCpcReply() {
+ try {
+ // wait for CPC reply
+ cpcReplySemaphore.acquire();
+ } catch (InterruptedException e) {
+
+ } finally {
+ cpcReplySemaphore.release();
+ }
+ }
+
+ /**
+ * Blocks while trying to acquire a semaphore awaiting receipt of GetMeta Reply. When the GetMeta
+ * is received, or there is a timeout, or the client stops the semaphore will be released.
+ */
+ protected void waitForMetadataReply() {
+ try {
getMetaSemaphore.acquire();
- } catch( InterruptedException e) {
-
+ } catch (InterruptedException e) {
+
} finally {
getMetaSemaphore.release();
}
- }
+ }
- public String getPerformanceReport()
- {
- return null;
- }
-
- public synchronized void process() throws ResourceProcessException
- {
- if (!initialized)
- {
- throw new ResourceProcessException();
- }
- if (collectionReader == null)
- {
- throw new ResourceProcessException();
- }
- if ( !casQueueProducerReady ) {
+ public String getPerformanceReport() {
+ return null;
+ }
+
+ public synchronized void process() throws ResourceProcessException {
+ if (!initialized) {
+ throw new ResourceProcessException();
+ }
+ if (collectionReader == null) {
+ throw new ResourceProcessException();
+ }
+ if (!casQueueProducerReady) {
serveCASes(); // start CAS producer thread
}
- try
- {
- CAS cas = null;
- boolean hasNext = true;
- while ((hasNext = collectionReader.hasNext()) == true)
- {
- if (initialized && running)
- {
- cas = getCAS();
- collectionReader.getNext(cas);
- sendCAS(cas);
- }
- else
- {
- break;
- }
- }
-
- if (hasNext == false)
- {
- collectionProcessingComplete();
- }
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
- }
- protected ConcurrentHashMap getCache()
- {
- return clientCache;
- }
- /**
- * Sends a given CAS for analysis to the UIMA EE Service.
- *
- */
- private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
- {
- synchronized( sendMux ) {
- String casReferenceId = requestToCache.getCasReferenceId();
- try
- {
- if (!running)
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_sending_cas_INFO", new Object[] { "Asynchronous Client is Stopping" });
- }
- return null;
- }
-
- PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
- long t1 = System.nanoTime();
- if ( serializationStrategy.equals("xmi")) {
- XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
- String serializedCAS = serializeCAS(aCAS, serSharedData);
- msg.put( AsynchAEMessage.CAS, serializedCAS);
- if (remoteService)
- {
- requestToCache.setCAS(aCAS);
- // Store the serialized CAS in case the timeout occurs and need to send the
- // the offending CAS to listeners for reporting
- requestToCache.setCAS(serializedCAS);
- requestToCache.setXmiSerializationSharedData(serSharedData);
- }
- } else {
- byte[] serializedCAS = uimaSerializer.serializeCasToBinary(aCAS);
- msg.put( AsynchAEMessage.CAS, serializedCAS);
- if (remoteService)
- {
- requestToCache.setCAS(aCAS);
- }
- }
-
- requestToCache.setSerializationTime(System.nanoTime()-t1);
- msg.put( AsynchAEMessage.CasReference, casReferenceId);
- requestToCache.setIsRemote(remoteService);
- requestToCache.setEndpoint(getEndPointName());
- requestToCache.setProcessTimeout(processTimeout);
- requestToCache.setThreadId(Thread.currentThread().getId());
- requestToCache.clearTimeoutException();
-
- clientCache.put(casReferenceId, requestToCache);
- // The sendCAS() method is synchronized no need to synchronize the code
- // below
- if ( serviceDelegate.getState() == Delegate.TIMEOUT_STATE &&
- !serviceDelegate.isAwaitingPingReply()) {
- serviceDelegate.setAwaitingPingReply();
- System.out.println("--------------> Client Sending Ping Message");
- // Send PING Request to check delegate's availability
- sendMetaRequest();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
- new Object[] { serviceDelegate.getKey() });
- }
- }
- // Check delegate's state before sending it a CAS. The delegate
- // may have previously timed out and the client is in a process of pinging
- // the delegate to check its availability. While the delegate
- // is in this state, delay CASes by placing them on a list of
- // CASes pending dispatch. Once the ping reply is received all
- // delayed CASes will be dispatched to the delegate.
- if ( !delayCasIfDelegateInTimedOutState( casReferenceId) ) {
- // The delegate state is normal, add the CAS Id to the list
- // of CASes sent to the delegate.
- serviceDelegate.addCasToOutstandingList(casReferenceId);
- } else {
- System.out.println("..... Delaying CAS. Awaiting Ping Reply");
- // CAS was added to the list of CASes pending dispatch. The service
- // has previously timed out. A Ping message was dispatched to test
- // service availability. When the Ping reply is received ALL CASes
- // from the list of CASes pending dispatch will be sent to the
- // delegate.
- return casReferenceId;
- }
+ try {
+ CAS cas = null;
+ boolean hasNext = true;
+ while ((hasNext = collectionReader.hasNext()) == true) {
+ if (initialized && running) {
+ cas = getCAS();
+ collectionReader.getNext(cas);
+ sendCAS(cas);
+ } else {
+ break;
+ }
+ }
+
+ if (hasNext == false) {
+ collectionProcessingComplete();
+ }
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
+ }
+ }
+
+ protected ConcurrentHashMap getCache() {
+ return clientCache;
+ }
+
+ /**
+ * Sends a given CAS for analysis to the UIMA EE Service.
+ *
+ */
+ private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException {
+ synchronized (sendMux) {
+ String casReferenceId = requestToCache.getCasReferenceId();
+ try {
+ if (!running) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_sending_cas_INFO",
+ new Object[] { "Asynchronous Client is Stopping" });
+ }
+ return null;
+ }
+
+ PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
+ long t1 = System.nanoTime();
+ if (serializationStrategy.equals("xmi")) {
+ XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
+ String serializedCAS = serializeCAS(aCAS, serSharedData);
+ msg.put(AsynchAEMessage.CAS, serializedCAS);
+ if (remoteService) {
+ requestToCache.setCAS(aCAS);
+ // Store the serialized CAS in case the timeout occurs and need to send the
+ // the offending CAS to listeners for reporting
+ requestToCache.setCAS(serializedCAS);
+ requestToCache.setXmiSerializationSharedData(serSharedData);
+ }
+ } else {
+ byte[] serializedCAS = uimaSerializer.serializeCasToBinary(aCAS);
+ msg.put(AsynchAEMessage.CAS, serializedCAS);
+ if (remoteService) {
+ requestToCache.setCAS(aCAS);
+ }
+ }
+
+ requestToCache.setSerializationTime(System.nanoTime() - t1);
+ msg.put(AsynchAEMessage.CasReference, casReferenceId);
+ requestToCache.setIsRemote(remoteService);
+ requestToCache.setEndpoint(getEndPointName());
+ requestToCache.setProcessTimeout(processTimeout);
+ requestToCache.setThreadId(Thread.currentThread().getId());
+ requestToCache.clearTimeoutException();
+
+ clientCache.put(casReferenceId, requestToCache);
+ // The sendCAS() method is synchronized no need to synchronize the code
+ // below
+ if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE
+ && !serviceDelegate.isAwaitingPingReply()) {
+ serviceDelegate.setAwaitingPingReply();
+ System.out.println("--------------> Client Sending Ping Message");
+ // Send PING Request to check delegate's availability
+ sendMetaRequest();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
+ new Object[] { serviceDelegate.getKey() });
+ }
+ }
+ // Check delegate's state before sending it a CAS. The delegate
+ // may have previously timed out and the client is in a process of pinging
+ // the delegate to check its availability. While the delegate
+ // is in this state, delay CASes by placing them on a list of
+ // CASes pending dispatch. Once the ping reply is received all
+ // delayed CASes will be dispatched to the delegate.
+ if (!delayCasIfDelegateInTimedOutState(casReferenceId)) {
+ // The delegate state is normal, add the CAS Id to the list
+ // of CASes sent to the delegate.
+ serviceDelegate.addCasToOutstandingList(casReferenceId);
+ } else {
+ System.out.println("..... Delaying CAS. Awaiting Ping Reply");
+ // CAS was added to the list of CASes pending dispatch. The service
+ // has previously timed out. A Ping message was dispatched to test
+ // service availability. When the Ping reply is received ALL CASes
+ // from the list of CASes pending dispatch will be sent to the
+ // delegate.
+ return casReferenceId;
+ }
// Incremented number of outstanding CASes sent to a service. When a reply comes
// this counter is decremented
outstandingCasRequests.incrementAndGet();
- // Increment total number of CASes sent to a service. This is reset
- // on CPC
+ // Increment total number of CASes sent to a service. This is reset
+ // on CPC
totalCasRequestsSentBetweenCpCs.incrementAndGet();
- // Add message to the pending queue
- addMessage(msg);
- }
- catch (Exception e)
- {
- throw new ResourceProcessException(e);
- }
- return casReferenceId;
- }
- }
-
+ // Add message to the pending queue
+ addMessage(msg);
+ } catch (Exception e) {
+ throw new ResourceProcessException(e);
+ }
+ return casReferenceId;
+ }
+ }
+
/**
- * Checks the state of a delegate to see if it is in TIMEOUT State.
- * If it is, push the CAS id onto a list of CASes pending dispatch.
- * The delegate is in a questionable state and the aggregate sends
- * a ping message to check delegate's availability. If the delegate
- * responds to the ping, all CASes in the pending dispatch list will
- * be immediately dispatched.
- **/
- public boolean delayCasIfDelegateInTimedOutState( String aCasReferenceId ) throws AsynchAEException {
- if (serviceDelegate != null && serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
+ * Checks the state of a delegate to see if it is in TIMEOUT State. If it is, push the CAS id onto
+ * a list of CASes pending dispatch. The delegate is in a questionable state and the aggregate
+ * sends a ping message to check delegate's availability. If the delegate responds to the ping,
+ * all CASes in the pending dispatch list will be immediately dispatched.
+ **/
+ public boolean delayCasIfDelegateInTimedOutState(String aCasReferenceId) throws AsynchAEException {
+ if (serviceDelegate != null && serviceDelegate.getState() == Delegate.TIMEOUT_STATE) {
// Add CAS id to the list of delayed CASes.
serviceDelegate.addCasToPendingDispatchList(aCasReferenceId);
return true;
}
- return false; // Cas Not Delayed
+ return false; // Cas Not Delayed
}
-
- private ClientRequest produceNewClientRequestObject()
- {
- String casReferenceId = idGenerator.nextId();
- return new ClientRequest(casReferenceId, this);
- }
- /**
- * Sends a given CAS for analysis to the UIMA EE Service.
- *
- */
- public synchronized String sendCAS(CAS aCAS) throws ResourceProcessException
- {
- return this.sendCAS(aCAS, produceNewClientRequestObject());
- }
-
- /**
- * Handles response to CollectionProcessComplete request.
- *
- * @throws Exception
- */
- protected void handleCollectionProcessCompleteReply(Message message) throws Exception
- {
- int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
- try {
- if (AsynchAEMessage.Exception == payload)
- {
- ProcessTrace pt = new ProcessTrace_impl();
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
- Exception exception = retrieveExceptionFromMessage(message);
-
- status.addEventStatus("CpC", "Failed", exception);
- notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleCollectionProcessCompleteReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
- }
- }
- else
- {
- // After receiving CPC reply there may be cleanup to do. Delegate this
- // to platform specific implementation (ActiveMQ or WAS)
- cleanup();
- }
- } catch( Exception e ) {
- throw e;
- } finally {
- // Release the semaphore acquired in collectionProcessingComplete()
- cpcReplySemaphore.release();
- }
- }
-
- /**
- * Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
- *
- * @param message -
- * jms message containing serialized ResourceMetaData
- *
- * @throws Exception
- */
- protected void handleMetadataReply(Message message) throws Exception
- {
- serviceDelegate.cancelDelegateTimer();
- serviceDelegate.setState(Delegate.OK_STATE);
- // check if the reply msg contains replyTo destination. I will be
- // added by the Cas Multiplier to the getMeta reply
- if ( message.getJMSReplyTo() != null ) {
+
+ private ClientRequest produceNewClientRequestObject() {
+ String casReferenceId = idGenerator.nextId();
+ return new ClientRequest(casReferenceId, this);
+ }
+
+ /**
+ * Sends a given CAS for analysis to the UIMA EE Service.
+ *
+ */
+ public synchronized String sendCAS(CAS aCAS) throws ResourceProcessException {
+ return this.sendCAS(aCAS, produceNewClientRequestObject());
+ }
+
+ /**
+ * Handles response to CollectionProcessComplete request.
+ *
+ * @throws Exception
+ */
+ protected void handleCollectionProcessCompleteReply(Message message) throws Exception {
+ int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ try {
+ if (AsynchAEMessage.Exception == payload) {
+ ProcessTrace pt = new ProcessTrace_impl();
+ UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ Exception exception = retrieveExceptionFromMessage(message);
+
+ status.addEventStatus("CpC", "Failed", exception);
+ notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleCollectionProcessCompleteReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_exception_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
+ message.getStringProperty(AsynchAEMessage.CasReference), exception });
+ }
+ } else {
+ // After receiving CPC reply there may be cleanup to do. Delegate this
+ // to platform specific implementation (ActiveMQ or WAS)
+ cleanup();
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ // Release the semaphore acquired in collectionProcessingComplete()
+ cpcReplySemaphore.release();
+ }
+ }
+
+ /**
+ * Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
+ *
+ * @param message
+ * - jms message containing serialized ResourceMetaData
+ *
+ * @throws Exception
+ */
+ protected void handleMetadataReply(Message message) throws Exception {
+ serviceDelegate.cancelDelegateTimer();
+ serviceDelegate.setState(Delegate.OK_STATE);
+ // check if the reply msg contains replyTo destination. I will be
+ // added by the Cas Multiplier to the getMeta reply
+ if (message.getJMSReplyTo() != null) {
serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
}
- // Check if this is a reply for Ping sent in response to a timeout
- if ( serviceDelegate.isAwaitingPingReply() ) {
+ // Check if this is a reply for Ping sent in response to a timeout
+ if (serviceDelegate.isAwaitingPingReply()) {
serviceDelegate.resetAwaitingPingReply();
String casReferenceId = null;
//
- if ( serviceDelegate.isSynchronousAPI() ) {
- // Synchronous API used for sending outgoing messages.
- // Notify ALL sending threads. A send thread may have
- // added a CAS to the list of pending CASes due to
- // a timeout and subsequently entered a wait state
- // waiting for this notification. The CAS this thread
- // was trying to deliver will be taken off the pending
- // dispatch list and send to the service.
+ if (serviceDelegate.isSynchronousAPI()) {
+ // Synchronous API used for sending outgoing messages.
+ // Notify ALL sending threads. A send thread may have
+ // added a CAS to the list of pending CASes due to
+ // a timeout and subsequently entered a wait state
+ // waiting for this notification. The CAS this thread
+ // was trying to deliver will be taken off the pending
+ // dispatch list and send to the service.
ThreadMonitor threadMonitor = null;
Iterator it = threadMonitorMap.entrySet().iterator();
- while( it.hasNext() ) {
- threadMonitor = ((Entry<Long, ThreadMonitor>)it.next()).getValue();
+ while (it.hasNext()) {
+ threadMonitor = ((Entry<Long, ThreadMonitor>) it.next()).getValue();
threadMonitor.getMonitor().release();
}
} else {
- // Asynch API used for sending outgoing messages.
- // If there are delayed CASes in the delegate's list of CASes
- // pending dispatch, send them all to the delegate now.
- while( serviceDelegate.getState() == Delegate.OK_STATE &&
- ( casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
- ClientRequest cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
- if ( cachedRequest != null ) {
+ // Asynch API used for sending outgoing messages.
+ // If there are delayed CASes in the delegate's list of CASes
+ // pending dispatch, send them all to the delegate now.
+ while (serviceDelegate.getState() == Delegate.OK_STATE
+ && (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null) {
+ ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ if (cachedRequest != null) {
sendCAS(cachedRequest.getCAS(), cachedRequest);
}
}
}
- if ( serviceDelegate.getCasPendingReplyListSize() > 0) {
+ if (serviceDelegate.getCasPendingReplyListSize() > 0) {
serviceDelegate.restartTimerForOldestCasInOutstandingList();
}
- // Handled Ping reply
+ // Handled Ping reply
return;
- }
- int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
- removeFromCache(uniqueIdentifier);
-
- try {
- if (AsynchAEMessage.Exception == payload)
- {
- ProcessTrace pt = new ProcessTrace_impl();
- UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
- Exception exception = retrieveExceptionFromMessage(message);
- clientSideJmxStats.incrementMetaErrorCount();
- status.addEventStatus("GetMeta", "Failed", exception);
- notifyListeners(null, status, AsynchAEMessage.GetMeta);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_exception_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), exception });
- }
+ }
+ int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ removeFromCache(uniqueIdentifier);
+
+ try {
+ if (AsynchAEMessage.Exception == payload) {
+ ProcessTrace pt = new ProcessTrace_impl();
+ UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ Exception exception = retrieveExceptionFromMessage(message);
+ clientSideJmxStats.incrementMetaErrorCount();
+ status.addEventStatus("GetMeta", "Failed", exception);
+ notifyListeners(null, status, AsynchAEMessage.GetMeta);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleMetadataReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_exception_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
+ message.getStringProperty(AsynchAEMessage.CasReference), exception });
+ }
abort = true;
initialized = false;
- }
- else
- {
- // Check serialization supported by the service against client configuration.
- // If the client is configured to use Binary serialization *but* the service
- // doesnt support it, change the client serialization to xmi. Old services will
- // not return in a reply the type of serialization supported which implies "xmi".
- // New services *always* return "binary" as a default serialization. The client
- // however may still want to serialize messages using xmi though.
- if ( !message.propertyExists(AsynchAEMessage.Serialization)) {
- // Dealing with an old service here, check if there is a mismatch with the
- // client configuration. If the client is configured with binary serialization
- // override this and change serialization to "xmi".
- if ( getSerializationStrategy().equalsIgnoreCase("binary")) {
- System.out.println("\n\t***** WARNING: Service Doesn't Support Binary Serialization. Client Defaulting to XMI Serialization\n");
- // Override configured serialization
- setSerializationStrategy("xmi");
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] { });
- }
- }
- String meta = ((TextMessage) message).getText();
- ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
- XMLInputSource in1 = new XMLInputSource(bis, null);
- // Adam - store ResouceMetaData in field so we can return it from getMetaData().
- resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser().parseResourceMetaData(in1);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_meta_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
- }
- asynchManager.addMetadata(resourceMetadata);
- }
-
- } catch( Exception e) {
- throw e;
- } finally {
+ } else {
+ // Check serialization supported by the service against client configuration.
+ // If the client is configured to use Binary serialization *but* the service
+ // doesnt support it, change the client serialization to xmi. Old services will
+ // not return in a reply the type of serialization supported which implies "xmi".
+ // New services *always* return "binary" as a default serialization. The client
+ // however may still want to serialize messages using xmi though.
+ if (!message.propertyExists(AsynchAEMessage.Serialization)) {
+ // Dealing with an old service here, check if there is a mismatch with the
+ // client configuration. If the client is configured with binary serialization
+ // override this and change serialization to "xmi".
+ if (getSerializationStrategy().equalsIgnoreCase("binary")) {
+ System.out
+ .println("\n\t***** WARNING: Service Doesn't Support Binary Serialization. Client Defaulting to XMI Serialization\n");
+ // Override configured serialization
+ setSerializationStrategy("xmi");
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] {});
+ }
+ }
+ String meta = ((TextMessage) message).getText();
+ ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
+ XMLInputSource in1 = new XMLInputSource(bis, null);
+ // Adam - store ResouceMetaData in field so we can return it from getMetaData().
+ resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
+ .parseResourceMetaData(in1);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_handling_meta_reply_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
+ }
+ asynchManager.addMetadata(resourceMetadata);
+ }
+
+ } catch (Exception e) {
+ throw e;
+ } finally {
getMetaSemaphore.release();
- }
- }
+ }
+ }
- protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand)
- {
- for (int i = 0; listeners != null && i < listeners.size(); i++)
- {
- UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
- switch( aCommand )
- {
- case AsynchAEMessage.GetMeta:
- statCL.initializationComplete(aStatus);
- break;
-
- case AsynchAEMessage.CollectionProcessComplete:
- statCL.collectionProcessComplete(aStatus);
- break;
-
- case AsynchAEMessage.Process:
- case AsynchAEMessage.Ping:
- statCL.entityProcessComplete(aCAS, aStatus);
- break;
-
- }
-
- }
- }
-
- protected void cancelTimer(String identifier)
- {
- ClientRequest request = null;
- if (clientCache.containsKey(identifier))
- {
- request = (ClientRequest) clientCache.get(identifier);
- if ( request != null )
- {
- request.cancelTimer();
- }
- }
- }
-
- private boolean isException( Message message ) throws Exception
- {
- int payload;
- if (message.propertyExists(AsynchAEMessage.Payload))
- {
- payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
- }
- else
- {
- throw new InvalidMessageException("Message Does not Contain Payload property");
- }
-
- return ( AsynchAEMessage.Exception == payload ? true : false);
- }
- private Exception retrieveExceptionFromMessage( Message message) throws Exception
- {
- Exception exception = null;
- if ( message instanceof ObjectMessage && ((ObjectMessage)message).getObject() instanceof Exception )
- {
- exception = (Exception) ((ObjectMessage) message).getObject();
- }
- else if ( message instanceof TextMessage )
- {
- exception = new UimaEEServiceException(((TextMessage)message).getText());
- }
- return exception;
- }
- private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, Message message) throws Exception
- {
- // Save reply message in the cache
- cachedRequest.setMessage(message);
- wakeUpSendThread(cachedRequest);
- }
+ protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) {
+ for (int i = 0; listeners != null && i < listeners.size(); i++) {
+ UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
+ switch (aCommand) {
+ case AsynchAEMessage.GetMeta:
+ statCL.initializationComplete(aStatus);
+ break;
- private void wakeUpSendThread(ClientRequest cachedRequest) throws Exception {
- if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
- {
- ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
- // Unblock the sending thread so that it can complete processing
- // of the reply. The message has been stored in the cache and
- // when the thread wakes up due to notification below, it will
- // retrieve the reply and process it.
- if ( threadMonitor != null ) {
+ case AsynchAEMessage.CollectionProcessComplete:
+ statCL.collectionProcessComplete(aStatus);
+ break;
+
+ case AsynchAEMessage.Process:
+ case AsynchAEMessage.Ping:
+ statCL.entityProcessComplete(aCAS, aStatus);
+ break;
+
+ }
+
+ }
+ }
+
+ protected void cancelTimer(String identifier) {
+ ClientRequest request = null;
+ if (clientCache.containsKey(identifier)) {
+ request = (ClientRequest) clientCache.get(identifier);
+ if (request != null) {
+ request.cancelTimer();
+ }
+ }
+ }
+
+ private boolean isException(Message message) throws Exception {
+ int payload;
+ if (message.propertyExists(AsynchAEMessage.Payload)) {
+ payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ } else {
+ throw new InvalidMessageException("Message Does not Contain Payload property");
+ }
+
+ return (AsynchAEMessage.Exception == payload ? true : false);
+ }
+
+ private Exception retrieveExceptionFromMessage(Message message) throws Exception {
+ Exception exception = null;
+ if (message instanceof ObjectMessage
+ && ((ObjectMessage) message).getObject() instanceof Exception) {
+ exception = (Exception) ((ObjectMessage) message).getObject();
+ } else if (message instanceof TextMessage) {
+ exception = new UimaEEServiceException(((TextMessage) message).getText());
+ }
+ return exception;
+ }
+
+ private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, Message message)
+ throws Exception {
+ // Save reply message in the cache
+ cachedRequest.setMessage(message);
+ wakeUpSendThread(cachedRequest);
+ }
+
+ private void wakeUpSendThread(ClientRequest cachedRequest) throws Exception {
+ if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) {
+ ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest
+ .getThreadId());
+ // Unblock the sending thread so that it can complete processing
+ // of the reply. The message has been stored in the cache and
+ // when the thread wakes up due to notification below, it will
+ // retrieve the reply and process it.
+ if (threadMonitor != null) {
cachedRequest.setReceivedProcessCasReply();
threadMonitor.getMonitor().release();
}
}
-
- }
- /**
- * Handles a ServiceInfo message returned from the Cas Multiplier. The
- * primary purpose of this message is to provide the client with a dedicated
- * queue object where the client may send messages to the specific
- * CM service instance. An example of this would be a stop request that
- * client needs to send to the specific Cas Multiplier.
- *
- * @param message - message received from a service
- * @throws Exception
- */
- protected void handleServiceInfo(Message message ) throws Exception {
- String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
- if ( message.getJMSReplyTo() != null ) {
- List<DelegateEntry> outstandingCasList =
- serviceDelegate.getDelegateCasesPendingRepy();
- for( DelegateEntry entry : outstandingCasList) {
- if ( entry.getCasReferenceId().equals(casReferenceId)) {
- // The Cas is still being processed
- ClientRequest casCachedRequest =
- (ClientRequest)clientCache.get(casReferenceId);
- if ( casCachedRequest != null ) {
- System.out.println("Client Received ServiceInfo Notification from CAS Multiplier For CAS: "+casReferenceId);
- casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo());
- }
- return;
- }
- }
- }
- }
- /**
- * Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote),
- * deserialize the CAS and notify the application of the completed analysis via application listener.
- *
- * @param message -
- * jms message containing serialized CAS
- *
- * @throws Exception
- */
- protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt) throws Exception
- {
- if ( !running )
- {
- return;
- }
- int payload = -1;
- String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
- // Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
- if (message.propertyExists(AsynchAEMessage.Payload))
- {
- payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
- }
- // Fetch entry from the client cache for a cas id returned from the service
- // The client cache maintains an entry for every outstanding CAS sent to the
- // service.
+
+ }
+
+ /**
+ * Handles a ServiceInfo message returned from the Cas Multiplier. The primary purpose of this
+ * message is to provide the client with a dedicated queue object where the client may send
+ * messages to the specific CM service instance. An example of this would be a stop request that
+ * client needs to send to the specific Cas Multiplier.
+ *
+ * @param message
+ * - message received from a service
+ * @throws Exception
+ */
+ protected void handleServiceInfo(Message message) throws Exception {
+ String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
+ if (message.getJMSReplyTo() != null) {
+ List<DelegateEntry> outstandingCasList = serviceDelegate.getDelegateCasesPendingRepy();
+ for (DelegateEntry entry : outstandingCasList) {
+ if (entry.getCasReferenceId().equals(casReferenceId)) {
+ // The Cas is still being processed
+ ClientRequest casCachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ if (casCachedRequest != null) {
+ System.out
+ .println("Client Received ServiceInfo Notification from CAS Multiplier For CAS: "
+ + casReferenceId);
+ casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo());
+ }
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Handles response to Process CAS request. If the message originated in a service that is running
+ * in a separate jvm (remote), deserialize the CAS and notify the application of the completed
+ * analysis via application listener.
+ *
+ * @param message
+ * - jms message containing serialized CAS
+ *
+ * @throws Exception
+ */
+ protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt)
+ throws Exception {
+ if (!running) {
+ return;
+ }
+ int payload = -1;
+ String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
+ // Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
+ if (message.propertyExists(AsynchAEMessage.Payload)) {
+ payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ }
+ // Fetch entry from the client cache for a cas id returned from the service
+ // The client cache maintains an entry for every outstanding CAS sent to the
+ // service.
ClientRequest cachedRequest = null;
-
- if ( casReferenceId != null ) {
- cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
- // Incremente number of replies
- if ( cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId()) )
- {
+
+ if (casReferenceId != null) {
+ cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ // Incremente number of replies
+ if (cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId())) {
// Received a reply, decrement number of outstanding CASes
long outstandingCasCount = outstandingCasRequests.decrementAndGet();
- if ( outstandingCasCount == 0) {
+ if (outstandingCasCount == 0) {
cpcReadySemaphore.release();
- }
+ }
}
}
- if (AsynchAEMessage.Exception == payload)
- {
- handleException(message, cachedRequest, true);
- return;
- }
- // If the Cas Reference id not in the message check if the message contains an
- // exception and if so, handle the exception and return.
- if ( casReferenceId == null )
- {
- return;
- }
- if ( serviceDelegate.getCasProcessTimeout() > 0) {
+ if (AsynchAEMessage.Exception == payload) {
+ handleException(message, cachedRequest, true);
+ return;
+ }
+ // If the Cas Reference id not in the message check if the message contains an
+ // exception and if so, handle the exception and return.
+ if (casReferenceId == null) {
+ return;
+ }
+ if (serviceDelegate.getCasProcessTimeout() > 0) {
serviceDelegate.cancelDelegateTimer();
}
-
- serviceDelegate.removeCasFromOutstandingList(casReferenceId);
- if ( message instanceof TextMessage && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), message.toString()+((TextMessage) message).getText() });
- }
-
-
- if ( cachedRequest != null )
- {
- // Store the total latency for this CAS. The departure time is set right before the CAS
- // is sent to a service.
- cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
-
- // If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
- // sent the CAS and process the reply
- if ( cachedRequest.isSynchronousInvocation() )
- {
- handleProcessReplyFromSynchronousCall(cachedRequest, message);
- }
- else
- {
- deserializeAndCompleteProcessingReply( casReferenceId, message, cachedRequest, pt, doNotify );
- }
- }
- else if ( message.propertyExists(AsynchAEMessage.InputCasReference) )
- {
- handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
- }
- else
- {
+
+ serviceDelegate.removeCasFromOutstandingList(casReferenceId);
+ if (message instanceof TextMessage
+ && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "handleProcessReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_handling_process_reply_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
+ message.getStringProperty(AsynchAEMessage.CasReference),
+ message.toString() + ((TextMessage) message).getText() });
+ }
+
+ if (cachedRequest != null) {
+ // Store the total latency for this CAS. The departure time is set right before the CAS
+ // is sent to a service.
+ cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
+
+ // If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
+ // sent the CAS and process the reply
+ if (cachedRequest.isSynchronousInvocation()) {
+ handleProcessReplyFromSynchronousCall(cachedRequest, message);
+ } else {
+ deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, doNotify);
+ }
+ } else if (message.propertyExists(AsynchAEMessage.InputCasReference)) {
+ handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
+ } else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- // Most likely expired message. Already handled as timeout. Discard the message and move on to the next
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
- }
- }
- }
-
- private void handleProcessReplyFromCasMultiplier( Message message, String casReferenceId, int payload /*, ClientRequest inputCasCachedRequest*/) throws Exception
- {
- // Check if the message contains a CAS that was generated by a Cas Multiplier. If so,
- // verify that the message also includes an input CAS id and that such input CAS id
- // exists in the client's cache.
- // Fetch the input CAS Reference Id from which the CAS being processed was generated from
- String inputCasReferenceId =
- message.getStringProperty(AsynchAEMessage.InputCasReference);
- // Fetch an entry from the client cache for a given input CAS id. This would be an id
- // of the CAS that the client sent out to the service.
- ClientRequest inputCasCachedRequest =
- (ClientRequest)clientCache.get(inputCasReferenceId);
- if ( inputCasCachedRequest == null )
- {
+ // Most likely expired message. Already handled as timeout. Discard the message and move on
+ // to the next
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleProcessReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_expired_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
+ message.getStringProperty(AsynchAEMessage.CasReference) });
+ }
+ }
+ }
+
+ private void handleProcessReplyFromCasMultiplier(Message message, String casReferenceId,
+ int payload /* , ClientRequest inputCasCachedRequest */) throws Exception {
+ // Check if the message contains a CAS that was generated by a Cas Multiplier. If so,
+ // verify that the message also includes an input CAS id and that such input CAS id
+ // exists in the client's cache.
+ // Fetch the input CAS Reference Id from which the CAS being processed was generated from
+ String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
+ // Fetch an entry from the client cache for a given input CAS id. This would be an id
+ // of the CAS that the client sent out to the service.
+ ClientRequest inputCasCachedRequest = (ClientRequest) clientCache.get(inputCasReferenceId);
+ if (inputCasCachedRequest == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- // Most likely expired message. Already handled as timeout. Discard the message and move on to the next
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_expired_msg_INFO",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference) });
- }
- return;
- }
- if (inputCasCachedRequest.isSynchronousInvocation()) {
- handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
- }
- // Fetch the destination for Free CAS notification
- Destination freeCASNotificationDestination = message.getJMSReplyTo();
- if ( freeCASNotificationDestination != null )
- {
- TextMessage msg = createTextMessage();
- setReleaseCASMessage(msg, casReferenceId);
- // Create Message Producer for the Destination
- MessageProducer msgProducer =
- getMessageProducer(freeCASNotificationDestination);
- if ( msgProducer != null )
- {
- try
- {
- // Send FreeCAS message to a Cas Multiplier
- msgProducer.send(msg);
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_release_cas_FINEST",
- new Object[] { freeCASNotificationDestination, message.getStringProperty(AsynchAEMessage.CasReference) });
- }
- }
- catch( Exception e)
- {
- e.printStackTrace();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING", new Object[] { "Free Cas Temp Destination", e });
- }
- }
- }
- }
- CAS cas = null;
- if ( message instanceof TextMessage )
- {
- cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL );
- }
- else
- {
+ // Most likely expired message. Already handled as timeout. Discard the message and move on
+ // to the next
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleProcessReplyFromCasMultiplier",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_expired_msg_INFO",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
+ message.getStringProperty(AsynchAEMessage.CasReference) });
+ }
+ return;
+ }
+ if (inputCasCachedRequest.isSynchronousInvocation()) {
+ handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
+ }
+ // Fetch the destination for Free CAS notification
+ Destination freeCASNotificationDestination = message.getJMSReplyTo();
+ if (freeCASNotificationDestination != null) {
+ TextMessage msg = createTextMessage();
+ setReleaseCASMessage(msg, casReferenceId);
+ // Create Message Producer for the Destination
+ MessageProducer msgProducer = getMessageProducer(freeCASNotificationDestination);
+ if (msgProducer != null) {
+ try {
+ // Send FreeCAS message to a Cas Multiplier
+ msgProducer.send(msg);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "handleProcessReplyFromCasMultiplier",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_sending_release_cas_FINEST",
+ new Object[] { freeCASNotificationDestination,
+ message.getStringProperty(AsynchAEMessage.CasReference) });
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleProcessReplyFromCasMultiplier", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_error_while_sending_msg__WARNING",
+ new Object[] { "Free Cas Temp Destination", e });
+ }
+ }
+ }
+ }
+ CAS cas = null;
+ if (message instanceof TextMessage) {
+ cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL);
+ } else {
long bodyLength = ((BytesMessage) message).getBodyLength();
- byte[] serializedCas = new byte[(int)bodyLength];
+ byte[] serializedCas = new byte[(int) bodyLength];
((BytesMessage) message).readBytes(serializedCas);
cas = deserializeCAS(serializedCas, SHADOW_CAS_POOL);
- }
- completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest, null);
- }
-
- private boolean isShutdownException( Exception exception ) throws Exception
- {
- if ( exception != null )
- {
- if ( exception instanceof ServiceShutdownException ||
- exception.getCause() != null && exception.getCause()
- instanceof ServiceShutdownException )
- {
- return true;
- }
- }
- return false;
- }
- private void handleException( Message message, ClientRequest cachedRequest, boolean doNotify )
- throws Exception
- {
- Exception exception = retrieveExceptionFromMessage(message);
- if ( !isShutdownException(exception))
- {
- clientSideJmxStats.incrementProcessErrorCount();
- }
- if ( exception != null && cachedRequest != null ) {
- cachedRequest.setException(exception);
- if ( exception instanceof AnalysisEngineProcessException ||
- ( exception.getCause() != null &&
- ( exception.getCause() instanceof AnalysisEngineProcessException ||
- exception.getCause() instanceof ServiceShutdownException) ) ) {
[... 1987 lines stripped ...]