You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:53:06 UTC
svn commit: r688174 [1/3] - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main:
java/org/apache/uima/aae/ java/org/apache/uima/aae/client/
java/org/apache/uima/aae/controller/
java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aa...
Author: eae
Date: Fri Aug 22 11:53:05 2008
New Revision: 688174
URL: http://svn.apache.org/viewvc?rev=688174&view=rev
Log:
UIMA-1147 commit Jerry's work (patches) merging the post1st branch into the trunk
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java Fri Aug 22 11:53:05 2008
@@ -96,7 +96,7 @@
contextName = aContextName;
setInitialized(true);
if (aPerformanceTuningSettings != null) {
- System.out.println("CasManager Iniatialized Cas Pool:" + aContextName + ". Cas Pool Size:"
+ System.out.println("CasManager Initialized Cas Pool:" + aContextName + ". Cas Pool Size:"
+ aCasPoolSize + " Initial Cas Heap Size:"
+ aPerformanceTuningSettings.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE) + " cells");
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Fri Aug 22 11:53:05 2008
@@ -33,11 +33,13 @@
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.EventSubscriber;
import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.monitor.statistics.DelegateStats;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.OutOfTypeSystemData;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.flow.FinalStep;
import org.apache.uima.flow.Step;
import org.apache.uima.util.Level;
@@ -222,24 +224,36 @@
{
size = i;
}
- public synchronized void dumpContents()
+ public synchronized void dumpContents(String aControllerName)
{
+ int count=0;
if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
{
- int count=0;
Iterator it = cache.keySet().iterator();
-
StringBuffer sb = new StringBuffer("\n");
+
while( it.hasNext() )
{
String key = (String) it.next();
CacheEntry entry = (CacheEntry)cache.get(key);
count++;
- sb.append(key+"\n");
+ if ( entry.isSubordinate())
+ {
+ sb.append(key+ " Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount()+" Parent CAS id:"+entry.getInputCasReferenceId());
+ }
+ else
+ {
+ sb.append(key+ " *** Input CAS. Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount());
+ }
+ if ( entry.isWaitingForRelease() )
+ {
+ sb.append(" <<< Reached Final State in Controller:"+aControllerName);
+ }
+ sb.append("\n");
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_cache_entry_key__FINEST",
- new Object[] { count, sb.toString() });
+ new Object[] { aControllerName, count, sb.toString() });
sb.setLength(0);
/*
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -247,7 +261,30 @@
new Object[] { count });
*/
}
+ else if ( UIMAFramework.getLogger().isLoggable(Level.FINE) )
+ {
+ Iterator it = cache.keySet().iterator();
+ StringBuffer sb = new StringBuffer("\n");
+ int inFinalState=0;
+
+ while( it.hasNext() )
+ {
+ String key = (String) it.next();
+ CacheEntry entry = (CacheEntry)cache.get(key);
+ count++;
+ if ( entry.isWaitingForRelease() )
+ {
+ inFinalState++;
+ }
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_abbrev_cache_stats___FINE",
+ new Object[] { aControllerName, count, inFinalState });
+
+
+ }
}
+
public synchronized void remove(String aCasReferenceId)
{
if (aCasReferenceId != null && cache.containsKey(aCasReferenceId))
@@ -284,7 +321,7 @@
casRefEntry.deleteCAS();
}
- public CacheEntry[] getCacheEntriesForEndpoint(String anEndpointName )
+ public synchronized CacheEntry[] getCacheEntriesForEndpoint(String anEndpointName )
{
CacheEntry[] entries;
ArrayList list = new ArrayList();
@@ -375,8 +412,12 @@
CacheEntry casRefEntry = getEntry(aCasReferenceId);
return casRefEntry.getOtsd();
}
- private CacheEntry getEntry(String aCasReferenceId)
+ private synchronized CacheEntry getEntry(String aCasReferenceId)
{
+ if ( !cache.containsKey(aCasReferenceId))
+ {
+ return null;
+ }
return (CacheEntry) cache.get(aCasReferenceId);
}
public void addEndpoint( Endpoint anEndpoint, String aCasReferenceId)
@@ -414,31 +455,6 @@
CacheEntry casRefEntry = getEntry(aCasReferenceId);
return casRefEntry.getStartTime();
}
- public synchronized String register(String anInputCasRefId, long aCurrentSequence, CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd)
- throws AsynchAEException
- {
- String casReferenceId = anInputCasRefId+"."+String.valueOf(aCurrentSequence);
-
- register(aCAS, aMessageContext, otsd, casReferenceId);
- return casReferenceId;
- }
- public synchronized String register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd)
- throws AsynchAEException
- {
- //System.out.println("Register");
- String casReferenceId = idGenerator.nextId();
- register(aCAS, aMessageContext, otsd, casReferenceId);
- return casReferenceId;
- }
-
- public synchronized String register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData)
- throws AsynchAEException
- {
- //System.out.println("Register");
- String casReferenceId = idGenerator.nextId();
- register(aCAS, aMessageContext, sharedData, casReferenceId);
- return casReferenceId;
- }
public boolean entryExists(String aCasReferenceId)
{
try
@@ -455,39 +471,37 @@
}
return true;
}
- public synchronized void register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd, String aCasReferenceId)
+
+ public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd)
throws AsynchAEException
{
- cache.put(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, otsd));
+// String casReferenceId = idGenerator.nextId();
+ return register(aCAS, aMessageContext, otsd, idGenerator.nextId());
+// return casReferenceId;
}
-
- public synchronized void register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData, String aCasReferenceId)
+ public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData)
throws AsynchAEException
{
- cache.put(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, sharedData));
+// String casReferenceId = idGenerator.nextId();
+ return register(aCAS, aMessageContext, sharedData, idGenerator.nextId());
+// return casReferenceId;
}
- public void register(CAS aCAS, OutOfTypeSystemData otsd, String aCasReferenceId )
+ public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd, String aCasReferenceId)
throws AsynchAEException
{
- CacheEntry casRefEntry = getEntry(aCasReferenceId);
- if ( casRefEntry == null )
- {
- throw new AsynchAEException("Cas Not Found In CasManager Cache. CasReferenceId::"+aCasReferenceId+" is Invalid");
- }
- casRefEntry.setCas(aCAS, otsd);
+ return registerCacheEntry(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, otsd));
}
- public void register(CAS aCAS, String aCasReferenceId )
+ public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData, String aCasReferenceId)
throws AsynchAEException
{
- CacheEntry casRefEntry = getEntry(aCasReferenceId);
- if ( casRefEntry == null )
- {
- throw new AsynchAEException("Cas Not Found In CasManager Cache. CasReferenceId::"+aCasReferenceId+" is Invalid");
- }
- casRefEntry.setCas(aCAS);
+ return registerCacheEntry(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, sharedData));
+ }
+ private CacheEntry registerCacheEntry( String aCasReferenceId, CacheEntry entry )
+ {
+ cache.put(aCasReferenceId, entry);
+ return entry;
}
-
public int getNumberOfParallelDelegates(String aCasReferenceId)
throws AsynchAEException
{
@@ -499,6 +513,36 @@
return casRefEntry.getNumberOfParallelDelegates();
}
+ public synchronized boolean hasNoSubordinates(String aCasReferenceId)
+ {
+ Iterator it = cache.keySet().iterator();
+ while( it.hasNext() )
+ {
+ String key = (String) it.next();
+ CacheEntry entry = (CacheEntry)cache.get(key);
+ if ( entry.getInputCasReferenceId() != null && entry.getInputCasReferenceId().equals(aCasReferenceId))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Endpoint getTopAncestorEndpoint(CacheEntry anEntry) throws Exception
+ {
+ if ( anEntry == null )
+ {
+ return null;
+ }
+
+ if ( anEntry.getInputCasReferenceId() == null )
+ {
+ return anEntry.getMessageOrigin();
+ }
+ CacheEntry parentEntry = getCacheEntryForCAS(anEntry.getInputCasReferenceId());
+ return getTopAncestorEndpoint(parentEntry);
+ }
+
public void setNumberOfParallelDelegates(int aParallelDelegateCount, String aCasReferenceId)
throws AsynchAEException
{
@@ -510,7 +554,7 @@
casRefEntry.setNumberOfParallelDelegates(aParallelDelegateCount);
}
- public CacheEntry getCacheEntryForCAS( String aCasReferenceId )
+ public synchronized CacheEntry getCacheEntryForCAS( String aCasReferenceId )
throws AsynchAEException
{
CacheEntry casRefEntry = getEntry(aCasReferenceId);
@@ -593,34 +637,23 @@
private int state = 0;
+ private long sequence = 0;
+
+ private Endpoint freeCasEndpoint;
+
+ private FinalStep step;
+
+ private boolean waitingForRealease;
protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor, OutOfTypeSystemData aotsd)
{
this(aCas, aCasReferenceId, aMessageAccessor);
messageAccessor = aMessageAccessor;
-/*
- cas = aCas;
- otsd = aotsd;
- if ( aMessageAccessor != null )
- {
- messageOrigin = aMessageAccessor.getEndpoint();
- }
- casReferenceId = aCasReferenceId;
-*/
}
protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor, XmiSerializationSharedData sdata)
{
this(aCas, aCasReferenceId, aMessageAccessor);
deserSharedData = sdata;
-/*
- cas = aCas;
- messageAccessor = aMessageAccessor;
- if ( aMessageAccessor != null )
- {
- messageOrigin = aMessageAccessor.getEndpoint();
- }
- casReferenceId = aCasReferenceId;
-*/
}
private CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor )
{
@@ -631,6 +664,17 @@
messageOrigin = aMessageAccessor.getEndpoint();
}
casReferenceId = aCasReferenceId;
+ try
+ {
+ if ( aMessageAccessor.propertyExists(AsynchAEMessage.CasSequence) )
+ {
+ sequence = aMessageAccessor.getMessageLongProperty(AsynchAEMessage.CasSequence);
+ }
+ }
+ catch( Exception e)
+ {
+ e.printStackTrace();
+ }
}
public String getCasReferenceId()
{
@@ -914,6 +958,41 @@
{
state = aState;
}
+ public long getCasSequence()
+ {
+ return sequence;
+ }
+ public void setCasSequence(long sequence)
+ {
+ this.sequence = sequence;
+ }
+
+ public void setFreeCasEndpoint( Endpoint aFreeCasEndpoint )
+ {
+ freeCasEndpoint = aFreeCasEndpoint;
+ }
+ public Endpoint getFreeCasEndpoint()
+ {
+ return freeCasEndpoint;
+ }
+
+ public void setFinalStep( FinalStep step )
+ {
+ this.step = step;
+ }
+ public FinalStep getFinalStep()
+ {
+ return step;
+ }
+ public void setWaitingForRelease(boolean flag)
+ {
+ waitingForRealease = flag;
+ }
+
+ public boolean isWaitingForRelease()
+ {
+ return waitingForRealease;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java Fri Aug 22 11:53:05 2008
@@ -30,7 +30,6 @@
public void setServerUri(String aServerUri);
public String getInputQueueName();
public ServiceInfo getServiceInfo();
-// public void stop() throws Exception;
public boolean isStopped();
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java Fri Aug 22 11:53:05 2008
@@ -19,6 +19,7 @@
package org.apache.uima.aae;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.error.AsynchAEException;
@@ -46,7 +47,9 @@
public void sendReply( String aCasReferenceId, Endpoint anEndpoint ) throws AsynchAEException;
-// public void sendReply( AnalysisEngineMetaData anAnalysisEngineMetadata, Endpoint anEndpoint, boolean serialize ) throws AsynchAEException;
+ public void sendReply( CacheEntry entry, Endpoint anEndpoint ) throws AsynchAEException;
+
+ // public void sendReply( AnalysisEngineMetaData anAnalysisEngineMetadata, Endpoint anEndpoint, boolean serialize ) throws AsynchAEException;
public void sendReply( ProcessingResourceMetaData aProcessingResourceMetadata, Endpoint anEndpoint, boolean serialize ) throws AsynchAEException;
public void sendReply(Throwable t, String aCasReferenceId, Endpoint anEndpoint, int aCommand) throws AsynchAEException;
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java Fri Aug 22 11:53:05 2008
@@ -23,5 +23,5 @@
public interface UimaASProcessStatus extends EntityProcessStatus {
public String getCasReferenceId();
-
+ public String getParentCasReferenceId();
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java Fri Aug 22 11:53:05 2008
@@ -47,15 +47,21 @@
public boolean isProcessed = true;
private String casReferenceId;
+
+ private String parentCasId;
public UimaASProcessStatusImpl(ProcessTrace p){
this(p,null);
}
public UimaASProcessStatusImpl(ProcessTrace p, String aCasReferenceId) {
+ this(p,aCasReferenceId,null);
+ }
+
+ public UimaASProcessStatusImpl(ProcessTrace p, String aCasReferenceId, String aParentCasReferenceId) {
prT = p;
casReferenceId = aCasReferenceId;
- }
-
+ parentCasId = aParentCasReferenceId;
+ }
public UimaASProcessStatusImpl(ProcessTrace p, boolean aSkip) {
prT = p;
isSkipped = aSkip;
@@ -143,4 +149,7 @@
public String getCasReferenceId() {
return casReferenceId;
}
+ public String getParentCasReferenceId() {
+ return parentCasId;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java Fri Aug 22 11:53:05 2008
@@ -74,6 +74,7 @@
public final String ServerUri = "ServerURI";
public final String Endpoint = "Endpoint";
public final String CasPoolSize = "CasPoolSize";
+ public final String ShadowCasPoolSize ="ShadowCasPoolSize";
public static final String ReplyWindow = "ReplyWindow";
public static final String Timeout = "Timeout";
public static final String CpcTimeout = "CpcTimeout";
@@ -203,10 +204,10 @@
* It doesn't use call-backs through a registered application listener.
*
* @param aCAS - a CAS to analyze.
- *
+ * @return - a unique id assigned to the CAS
* @throws ResourceProcessException
*/
- public void sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException;
+ public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException;
/**
* Deploys a UIMA AS container and all services defined in provided deployment
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -31,11 +31,14 @@
import org.apache.uima.aae.jmx.ServiceErrors;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.flow.FinalStep;
public interface AggregateAnalysisEngineController extends AnalysisEngineController
{
public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException;
+ public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer) throws AsynchAEException;
+
public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException;
public void addMessageOrigin( String aCasReferenceId, Endpoint anEndpoint );
@@ -60,6 +63,8 @@
public String lookUpDelegateKey( String aDelegateEndpointName );
+ public String lookUpDelegateKey( String aDelegateEndpointName, String server );
+
public UimaContext getChildUimaContext( String aDelegateEndpointName ) throws Exception;
// public void retryCAS( String aCasReferenceId, Endpoint anEndpoint )throws AsynchAEException;
@@ -79,7 +84,7 @@
public String getLastDelegateKeyFromFlow(String anInputCasReferenceId);
- public boolean sendRequestToReleaseCas();
+// public boolean sendRequestToReleaseCas();
public void registerChildController( AnalysisEngineController aChildController, String aDelegateKey) throws Exception;
@@ -105,5 +110,7 @@
public ServicePerformance getServicePerformance(String aDelegateKey );
- public boolean decrementCasSubordinateCount( CacheEntry aParentCasCacheEntry );
+// public boolean decrementCasSubordinateCount( CacheEntry aParentCasCacheEntry );
+
+ public void finalStep( FinalStep aStep, String aCasReferenceId);
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Aug 22 11:53:05 2008
@@ -84,7 +84,8 @@
private Map flowMap = new HashMap();
- protected ConcurrentHashMap destinationMap;
+// protected ConcurrentHashMap destinationMap;
+ private volatile ConcurrentHashMap destinationMap;
private Map destinationToKeyMap;
@@ -114,15 +115,14 @@
private boolean initialized = false;
- private int cmCasPoolSizeDelta = 0;
-
private int counter = 0;
private Object counterMonitor = new Object();
protected List childControllerList = new ArrayList();
+// private List childControllerList = new ArrayList();
- protected Map delegateStats = new HashMap();
+ private Map delegateStats = new HashMap();
private AggregateServiceInfo serviceInfo = null;
@@ -130,6 +130,13 @@
private boolean requestForMetaSentToRemotes = false;
+ private Object mux = new Object();
+
+ private boolean isIdle = true;
+
+ private long lastUpdate = System.nanoTime();
+
+ private long totalIdleTime = System.nanoTime();
private ConcurrentHashMap<String, Object[]> delegateStatMap =
new ConcurrentHashMap();
@@ -189,6 +196,23 @@
public void addMessageOrigin(String aCasReferenceId, Endpoint anEndpoint)
{
originMap.put(aCasReferenceId, anEndpoint);
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+ {
+ Iterator it = originMap.keySet().iterator();
+ StringBuffer sb = new StringBuffer();
+ while( it.hasNext())
+ {
+ String key = (String) it.next();
+ Endpoint e = (Endpoint) originMap.get(key);
+ if ( e != null )
+ {
+ sb.append("\t\nCAS:"+key+" Origin:"+e.getEndpoint());
+ }
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "addMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dump_msg_origin__FINE",
+ new Object[] {getComponentName(), sb.toString()});
+ }
}
@@ -361,7 +385,6 @@
String key = lookUpDelegateKey( aDelegateKey);
if ( endpoint == null )
{
-
endpoint = (Endpoint) destinationMap.get(key);
if ( endpoint == null )
{
@@ -377,7 +400,6 @@
if (sendReply && allDelegatesCompletedCollection() && getClientEndpoint() != null)
{
-
sendCpcReply();
}
}
@@ -403,9 +425,33 @@
}
// Log this controller's stats
logStats(getComponentName(),servicePerformance);
+
+ endProcess(AsynchAEMessage.Process);
+
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, getClientEndpoint());
clientEndpoint = null;
clearStats();
+
+ Map delegates = ((AggregateAnalysisEngineController)this).getDestinations();
+ Set set = delegates.entrySet();
+ for( Iterator it = set.iterator(); it.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)it.next();
+ Endpoint endpoint = (Endpoint)entry.getValue();
+ if ( endpoint != null )
+ {
+ // Fetch stats for the delegate
+ ServicePerformance delegatePerformanceStats =
+ ((AggregateAnalysisEngineController)this).
+ getDelegateServicePerformance((String)entry.getKey());
+ if ( delegatePerformanceStats != null )
+ {
+ //delegatePerformanceStats.reset();
+ }
+ }
+ }
+ //getServicePerformance().reset();
+
}
/**
*
@@ -432,7 +478,7 @@
if (!shownOnce)
{
shownOnce = true;
- cache.dumpContents();
+ cache.dumpContents(getComponentName());
}
if (cache.isEmpty())
@@ -473,7 +519,7 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cpc__FINEST", new Object[] { getName() });
- getInProcessCache().dumpContents();
+ getInProcessCache().dumpContents(getComponentName());
cacheClientEndpoint(anEndpoint);
@@ -685,7 +731,7 @@
// Save the subordinate Flow Object in a cache. Flow exists in the
// cache until the CAS is fully processed or it is
- // explicitely deleted when processing of this CAS cannot continue
+ // explicitly deleted when processing of this CAS cannot continue
synchronized( flowMap )
{
flowMap.put(aNewCasReferenceId, flow);
@@ -896,44 +942,218 @@
remoteEndpoints[i].initialize();
remoteEndpoints[i].setController(this);
String key = lookUpDelegateKey(remoteEndpoints[i].getEndpoint());
- Endpoint endpoint = ((Endpoint) destinationMap.get(key));
- if ( key != null && endpoint != null)
+ if ( key != null && destinationMap.containsKey(key))
{
- ServiceInfo serviceInfo = endpoint.getServiceInfo();
- PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo();
- pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
- pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
- pServiceInfo.setState(serviceInfo.getState());
- pServiceInfo.setAnalysisEngineInstanceCount(1);
-
- registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain()
- +super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceInfo.getLabel());
+ Endpoint endpoint = ((Endpoint) destinationMap.get(key));
+ if ( key != null && endpoint != null)
+ {
+ ServiceInfo serviceInfo = endpoint.getServiceInfo();
+ PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo();
+ pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
+ pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
+ pServiceInfo.setState(serviceInfo.getState());
+ pServiceInfo.setAnalysisEngineInstanceCount(1);
+
+ registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain()
+ +super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceInfo.getLabel());
+
+ ServicePerformance servicePerformance = new ServicePerformance();
+ //servicePerformance.setIdleTime(System.nanoTime());
+ servicePerformance.setRemoteDelegate();
+
+ registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+servicePerformance.getLabel());
+
+ ServiceErrors serviceErrors = new ServiceErrors();
+
+ registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceErrors.getLabel());
+ remoteIndex++;
+
+ serviceErrorMap.put(key, serviceErrors);
+ Object[] delegateStatsArray =
+ new Object[] { pServiceInfo, servicePerformance, serviceErrors };
- ServicePerformance servicePerformance = new ServicePerformance();
-
- registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+servicePerformance.getLabel());
+ delegateStatMap.put( key, delegateStatsArray);
+ }
+ dispatchMetadataRequest(remoteEndpoints[i]);
+ }
+ }
+ }
+ }
- ServiceErrors serviceErrors = new ServiceErrors();
-
- registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceErrors.getLabel());
- remoteIndex++;
+ public void finalStep(FinalStep aStep, String aCasReferenceId)
+ {
+ Endpoint endpoint=null;
+ boolean casDropped = false;
+
+ boolean subordinateCasInPlayCountDecremented=false;
+ CacheEntry cacheEntry = null;
+ Endpoint freeCasEndpoint = null;
+
+ try
+ {
+ // Get entry from the cache for a given CAS Id. This throws an exception if
+ // an entry doesnt exist in the cache
+ cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ // Mark the entry to indicate that the CAS reached a final step. This CAS
+ // may still have children and will not be returned to the client until
+ // all of them are fully processed. This state info will aid in the
+ // internal bookkeeping when the final child is processed.
+ cacheEntry.setState(CacheEntry.FINAL_STATE);
+
+ }
+ catch(Exception e)
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ return;
+ }
+ // Found the entry in the cache for a given CAS id
+ try
+ {
+ endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
+ // Check if this CAS has children (subordinates)
+ if ( getInProcessCache().hasNoSubordinates(aCasReferenceId) == false)
+ {
+ // This CAS has child CASes still in play. This CAS will remain in the cache
+ // until all its children are fully processed.
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_parent_cas_child_count__FINEST", new Object[] { getComponentName(),aCasReferenceId,cacheEntry.getSubordinateCasInPlayCount()});
+ // Leave input CAS in pending state. It will be returned to the client
+ // *only* if the last subordinate CAS is fully processed.
+ cacheEntry.setPendingReply(true);
+ cacheEntry.setFinalStep(aStep);
+ // Done here. There are subordinate CASes still being processed.
+ return;
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_parent_cas_no_children__FINEST", new Object[] { getComponentName(),aCasReferenceId});
- serviceErrorMap.put(key, serviceErrors);
- Object[] delegateStatsArray =
- new Object[] { pServiceInfo, servicePerformance, serviceErrors };
+ // If this CAS has a parent, save the destination of a CM that produced it and where we may need to send Free Cas Notification
+ if ( cacheEntry.isSubordinate())
+ {
+ freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+ }
- delegateStatMap.put( key, delegateStatsArray);
+ CacheEntry parentCASCacheEntry = null;
+ // If this service is not a Cas Multiplier and a given CAS has a parent
+ // decrement a number of children the parent CAS has in play. The child
+ // CAS will be dropped. If this aggragate *is* a cas multiplier, the client
+ // will send it a Release CAS request and only than the child count of the
+ // parent CAS can be decremented.
+ if ( cacheEntry.isSubordinate() && isTopLevelComponent())
+ {
+ // This is a subordinate CAS. First get cache entry for the input (parent) CAS
+ parentCASCacheEntry = getInProcessCache().getCacheEntryForCAS(cacheEntry.getInputCasReferenceId());
+ parentCASCacheEntry.decrementSubordinateCasInPlayCount();
+ // Save this state in case an exception happens below, the error handler will not decrement children again
+ subordinateCasInPlayCountDecremented = true;
+ }
+ Endpoint clientEndpoint = null;
+ // If the CAS was generated by this component but the Flow Controller wants to drop it OR this component
+ // is not a Cas Multiplier
+ if ( forceToDropTheCas( cacheEntry, aStep ) )
+ {
+ if ( cacheEntry.isReplyReceived() ) //|| isTopLevelComponent())
+ {
+ // Drop the CAS and remove cache entry for it
+ dropCAS(aCasReferenceId, true);
+ casDropped = true;
+ // If debug level=FINEST dump the entire cache
+ getInProcessCache().dumpContents(getComponentName());
}
- dispatchMetadataRequest(remoteEndpoints[i]);
+ }
+ else
+ {
+ if ( cacheEntry.isSubordinate())
+ {
+ cacheEntry.setWaitingForRelease(true);
+ }
+ // Send a reply to the Client. If the CAS is an input CAS it will be dropped
+ clientEndpoint = replyToClient( cacheEntry );
+ }
+ // Now check if the CASes parent CAS is ready for a finalStep. The parent CAS may
+ // have been processed already but it is cached since its children are still
+ // in play.
+ if ( releaseParentCas(casDropped, clientEndpoint, parentCASCacheEntry) )
+ {
+ // All subordinate CASes have been processed. Process the parent CAS recursively.
+ finalStep(parentCASCacheEntry.getFinalStep(), parentCASCacheEntry.getCasReferenceId());
}
}
+ catch( Exception e)
+ {
+ HashMap map = new HashMap();
+ map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ map.put(AsynchAEMessage.CasReference, aCasReferenceId);
+ // If the subordinate count has been decremented, let the error handler know
+ // so that it doesn't decrement the count again. The default action in the
+ // error handler is to decrement number of subordinates responding. An exception
+ // that is no subject to retry will be counted as a response.
+ if (subordinateCasInPlayCountDecremented)
+ {
+ map.put(AsynchAEMessage.SkipSubordinateCountUpdate, true);
+ }
+ if ( endpoint != null )
+ {
+ map.put(AsynchAEMessage.Endpoint, endpoint);
+ }
+ handleError(map, e);
+ }
+ finally
+ {
+ removeMessageOrigin(aCasReferenceId);
+ dropStats(aCasReferenceId, super.getName());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_show_internal_stats__FINEST", new Object[] { getName(), flowMap.size(),getInProcessCache().getSize(),originMap.size(), super.statsMap.size()});
+ // freeCasEndpoint is a special endpoint for sending Free CAS Notification.
+ if ( casDropped && freeCasEndpoint != null )
+ {
+ freeCasEndpoint.setReplyEndpoint(true);
+ try
+ {
+ // send Free CAS Notification to a Cas Multiplier
+ getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId, freeCasEndpoint);
+ }
+ catch( Exception e)
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ }
+ }
+ }
+ }
+ private boolean releaseParentCas(boolean casDropped, Endpoint clientEndpoint, CacheEntry parentCASCacheEntry)
+ {
+ return (
+ (casDropped || (clientEndpoint != null && !clientEndpoint.isRemote() ))
+ && parentCASCacheEntry != null
+ && parentCASCacheEntry.isReplyReceived()
+ && parentCASCacheEntry.isPendingReply()
+ && parentCASCacheEntry.getState() == CacheEntry.FINAL_STATE
+ && parentCASCacheEntry.getSubordinateCasInPlayCount() == 0
+ );
}
- private void finalStep(FinalStep aStep, String aCasReferenceId)// throws AsynchAEException
+ private boolean forceToDropTheCas( CacheEntry cacheEntry, FinalStep aStep)
+ {
+ // Get the key of the Cas Producer
+ String casProducer = cacheEntry.getCasProducerAggregateName();
+ // CAS is considered new from the point of view of this service IF it was produced by it
+ boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+ // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR this component
+ // is not a Cas Multiplier
+ if ( isNewCas && ( aStep.getForceCasToBeDropped() || !isCasMultiplier()) )
+ {
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ protected void finalStep(FinalStep aStep, String aCasReferenceId)// throws AsynchAEException
{
Endpoint endpoint=null;
boolean subordinateCasInPlayCountDecremented=false;
CacheEntry cacheEntry = null;
+ Endpoint freeCasEndpoint = null;
try
{
cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
@@ -944,7 +1164,8 @@
}
catch(Exception e)
{
- return;
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ return;
}
try
@@ -972,26 +1193,26 @@
// Leave input CAS in pending state. It will be returned to the client
// *only* if the last subordinate CAS is fully processed.
cacheEntry.setPendingReply(true);
- // Done here. There are subordinate CASes still being processed.
+ cacheEntry.setFinalStep(aStep);
+ // Done here. There are subordinate CASes still being processed.
return;
}
else
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_parent_cas_no_children__FINEST", new Object[] { getComponentName(),aCasReferenceId});
-
// All subordinates have been fully processed. Set the flag so that
// the input is returned back to the client.
replyWithInputCAS = true;
}
}
}
- else //if ( isTopLevelComponent())
+ else
{
// This is a subordinate CAS. First get cache entry for the input (parent) CAS
parentCASCacheEntry =
getInProcessCache().getCacheEntryForCAS(cacheEntry.getInputCasReferenceId());
- if ( getMessageOrigin(aCasReferenceId) == null )
+ if ( getMessageOrigin(aCasReferenceId) == null && !isCasMultiplier())
{
replyWithInputCAS = decrementCasSubordinateCount( parentCASCacheEntry);
if ( parentCASCacheEntry != null )
@@ -1006,9 +1227,8 @@
{
replyWithInputCAS = true;
}
-
}
-
+ freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
// Cas Processing has been completed. Check if the CAS should be sent to
// the client.
// Any of the two following conditions will prevent this aggregate from
@@ -1025,49 +1245,27 @@
// New CASes must be dropped if aggregate doesn't output them or if flow controller has ActionAfterCasMultiplier="drop"
if (isNewCas && (aStep.getForceCasToBeDropped() || !aggregateMetadata.getOperationalProperties().getOutputsNewCASes()))
{
+
endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
if ( cacheEntry.isReplyReceived())
{
dropCAS(aCasReferenceId, true);
}
-
- if ( parentCASCacheEntry != null //&& parentCASCacheEntry.isSubordinate()
- && parentCASCacheEntry.isReplyReceived()
- && parentCASCacheEntry.getState() == CacheEntry.FINAL_STATE
- && parentCASCacheEntry.getSubordinateCasInPlayCount() == 0)
- {
- // All subordinate CASes have been processed. Process the parent
- // CAS recursively.
- finalStep(aStep, parentCASCacheEntry.getCasReferenceId());
- }
- }
-/*
- if ( replyWithInputCAS && parentCASCacheEntry != null )
- {
- // Reply with the input CAS
- replyToClient( parentCASCacheEntry.getCasReferenceId() );
- }
- else
- {
- replyToClient( aCasReferenceId );
- }
-*/
- if ( replyWithInputCAS && getMessageOrigin(aCasReferenceId) != null)
- {
- // Reply with the input CAS
- replyToClient( aCasReferenceId );
- }
-
- String casMultiplierKey = cacheEntry.getCasMultiplierKey();
- if ( isNewCas && casMultiplierKey != null ) //&& cacheEntry.shouldSendRequestToFreeCas())
- {
- endpoint = lookUpEndpoint(casMultiplierKey, true);
- if ( endpoint != null && endpoint.isRemote() && endpoint.isCasMultiplier() ) //&& cacheEntry.shouldSendRequestToFreeCas() )
- {
- endpoint.setEndpoint(endpoint.getEndpoint()+"__CasSync");
- getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, endpoint );
- }
- endpoint = null;
+ }
+ else if (aggregateMetadata.getOperationalProperties().getOutputsNewCASes() ||
+ ( replyWithInputCAS && getMessageOrigin(aCasReferenceId) != null) )
+ {
+ // Send a reply to the Client. If the CAS is an input CAS it will be dropped
+ replyToClient( aCasReferenceId, cacheEntry );
+ }
+ if ( parentCASCacheEntry != null
+ && parentCASCacheEntry.isReplyReceived()
+ && parentCASCacheEntry.getState() == CacheEntry.FINAL_STATE
+ && parentCASCacheEntry.getSubordinateCasInPlayCount() == 0)
+ {
+ // All subordinate CASes have been processed. Process the parent
+ // CAS recursively.
+ finalStep(aStep, parentCASCacheEntry.getCasReferenceId());
}
removeMessageOrigin(aCasReferenceId);
dropStats(aCasReferenceId, super.getName());
@@ -1094,16 +1292,27 @@
}
handleError(map, e);
}
-/*
finally
{
- if ( aCasReferenceId != null && originMap.containsKey(aCasReferenceId))
+ // freeCasEndpoint is a special endpoint for sending Free CAS Notification.
+ // This endpoint will be set for each CAS generated in a Cas Multiplier.
+ if ( !isCasMultiplier() && freeCasEndpoint != null )
{
- originMap.remove(aCasReferenceId);
+ freeCasEndpoint.setReplyEndpoint(true);
+ try
+ {
+ // send Free CAS Notification to a Cas Multiplier
+ getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId, freeCasEndpoint);
+ }
+ catch( Exception e)
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ }
}
}
-*/
}
+ */
+/*
public boolean decrementCasSubordinateCount( CacheEntry aParentCasCacheEntry )
{
if ( aParentCasCacheEntry != null )
@@ -1128,33 +1337,43 @@
}
return false;
}
- private void replyToClient(String aCasReferenceId ) throws Exception
+*/
+// private void replyToClient(String aCasReferenceId, CacheEntry cacheEntry ) throws Exception
+ private Endpoint replyToClient(CacheEntry cacheEntry ) throws Exception
{
- Endpoint endpoint;
+ Endpoint endpoint = null;
// Get the endpoint that represents a client that send the request
// to this service. If the first arg to getEndpoint() is null, the method
// should return the origin.
if (isTopLevelComponent())
{
- endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
+ if ( cacheEntry.isSubordinate()) //getCasSequence() > 0 )
+ {
+ endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);
+// endpoint = getInProcessCache().getEndpoint(null, cacheEntry.getInputCasReferenceId());
+ }
+ else
+ {
+ endpoint = getInProcessCache().getEndpoint(null, cacheEntry.getCasReferenceId());
+ }
}
else
{
- endpoint = getMessageOrigin(aCasReferenceId);
- dropFlow(aCasReferenceId, false);
+ endpoint = getMessageOrigin(cacheEntry.getCasReferenceId());
+ dropFlow(cacheEntry.getCasReferenceId(), false);
}
if ( endpoint != null )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { aCasReferenceId, (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
+ "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { cacheEntry.getCasReferenceId(), (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
if (endpoint.getEndpoint() == null)
{
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { aCasReferenceId });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { cacheEntry.getCasReferenceId() });
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
- map.put(AsynchAEMessage.CasReference, aCasReferenceId);
+ map.put(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
handleError(map, new UnknownDestinationException());
}
@@ -1164,23 +1383,40 @@
if ( !isStopped() )
{
- // Send response to the given endpoint
- getOutputChannel().sendReply(aCasReferenceId, endpoint);
+ // Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
+ if ( cacheEntry.isSubordinate() && isCasMultiplier())
+ {
+ // Add the generated CAS to the outstanding CAS Map. Client notification will release
+ // this CAS back to its pool
+ synchronized(syncObject)
+ {
+ cmOutstandingCASes.put(cacheEntry.getCasReferenceId(),cacheEntry.getCasReferenceId());
+ }
+
+ // Send response to a given endpoint
+ //getOutputChannel().sendReply(cacheEntry.getCas(), cacheEntry.getInputCasReferenceId(), aCasReferenceId, endpoint, cacheEntry.getCasSequence());
+ getOutputChannel().sendReply(cacheEntry, endpoint);
+ }
+ else
+ {
+ // Send response to a given endpoint
+ getOutputChannel().sendReply(cacheEntry.getCasReferenceId(), endpoint);
+ }
}
}
- // If the destination for the reply is in the same jvm dont remove
- // the entry from the cache. The client may need to retrive CAS by reference
- // to do some post-processing. The client will remove the entry when done
- // post-processing CAS.
- if ( !endpoint.getServerURI().startsWith("vm:"))
- {
- // Message was fully processed, remove state info related to the
- // previous CAS from the cache
- InProcessCache cache = getInProcessCache();
-
- dropCAS(aCasReferenceId, true);
+ // Drop the CAS only if the client is remote and the CAS is an input CAS.
+ // If this CAS has a parent the client will send Realease CAS notification to release the CAS.
+ if ( endpoint.isRemote() && !cacheEntry.isSubordinate())
+ {
+ dropCAS(cacheEntry.getCasReferenceId(), true);
+ // If the cache is empty change the state of the Aggregate to idle
+ if ( getInProcessCache().isEmpty() )
+ {
+ endProcess(AsynchAEMessage.Process);
+ }
}
}
+ return endpoint;
}
private void executeFlowStep(FlowContainer aFlow, String aCasReferenceId, boolean newCAS) throws AsynchAEException
{
@@ -1228,6 +1464,19 @@
}
else if (step instanceof FinalStep)
{
+ // Special case: check if this CAS has just been produced by a Cas Multiplier.
+ // If so, we received a new CAS but there are no delegates in the pipeline.
+ // The CM was the last in the flow. In this case, set a property in the cache
+ // to simulate receipt of the reply to this CAS. This is so that the CAS is
+ // released in the finalStep() when the Aggregate is not a Cas Multiplier.
+ if ( newCAS)
+ {
+ CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ if ( entry != null )
+ {
+ entry.setReplyReceived();
+ }
+ }
finalStep((FinalStep) step, aCasReferenceId);
}
@@ -1301,13 +1550,58 @@
public String lookUpDelegateKey(String anEndpointName)
{
-
+ return lookUpDelegateKey(anEndpointName, null);
+ }
+ /**
+ * Returns a delegate key given an endpoint (queue) name and a server uri.
+ * If a server is null, only the endpoint name will be used for matching.
+ */
+ public String lookUpDelegateKey(String anEndpointName, String server)
+ {
+ String key = null;
if (destinationToKeyMap.containsKey(anEndpointName))
{
- return (String) destinationToKeyMap.get(anEndpointName);
+ Set keys = destinationMap.keySet();
+ Iterator it = keys.iterator();
+ // Find an endpoint for the GetMeta reply. To succeed, match the endpoint (queue) name
+ // as well as the server URI. We allow endpoints managed by different servers to have
+ // the same queue name.
+ // iterate over all endpoints until a match [queue,server] is found.
+ while( it.hasNext())
+ {
+ key = (String)it.next();
+ Endpoint_impl endp = (Endpoint_impl) destinationMap.get(key);
+
+ // Check if a queue name matches
+ if ( endp != null && endp.getEndpoint().equalsIgnoreCase(anEndpointName))
+ {
+ // Check if server match is requested as well
+ if ( server != null )
+ {
+ // server URIs must match
+ if ( endp.getServerURI() != null && endp.getServerURI().equalsIgnoreCase(server) )
+ {
+ // found a match for [queue,server]
+ break;
+ }
+ // Not found yet. Reset the key
+ key = null;
+ continue;
+ }
+ // found a match for [queue]
+ break;
+ }
+ // Not found yet. Reset the key
+ key = null;
+ }
}
- return null;
+// if (destinationToKeyMap.containsKey(anEndpointName))
+// {
+// return (String) destinationToKeyMap.get(anEndpointName);
+// }
+
+ return key;
}
public Endpoint lookUpEndpoint(String anAnalysisEngineKey, boolean clone) throws AsynchAEException
@@ -1354,13 +1648,34 @@
}
return null;
}
-
public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException
+ {
+ mergeTypeSystem(aTypeSystem, fromDestination, null);
+ }
+ public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer) throws AsynchAEException
{
try
{
- String key = findKeyForValue(fromDestination);
- Endpoint_impl endpoint = (Endpoint_impl) destinationMap.get(key);
+ Set keys = destinationMap.keySet();
+ Iterator it = keys.iterator();
+ Endpoint_impl endpoint = null;
+ String key = null;
+ // Find an endpoint for the GetMeta reply. To succeed, match the endpoint (queue) name
+ // as well as the server URI. We allow endpoints managed by different servers to have
+ // the same queue name.
+ // iterate over all endpoints until a match [queue,server] is found.
+ while( it.hasNext())
+ {
+ key = (String)it.next();
+ Endpoint_impl endp = (Endpoint_impl) destinationMap.get(key);
+
+ if ( endp.getServerURI() != null && endp.getServerURI().equalsIgnoreCase(fromServer) && endp.getEndpoint().equalsIgnoreCase(fromDestination))
+ {
+ endpoint = endp;
+ break;
+ }
+
+ }
if (endpoint == null)
{
@@ -1373,6 +1688,7 @@
endpoint.cancelTimer();
boolean collocatedAggregate = false;
ResourceMetaData resource = null;
+ ServiceInfo remoteDelegateServiceInfo = null;
if (aTypeSystem.trim().length() > 0)
{
if ( endpoint.isRemote() )
@@ -1395,29 +1711,11 @@
{
endpoint.setIsCasMultiplier(true);
remoteCasMultiplierList.add(key);
- if ( endpoint.isRemote())
- {
- int remoteCasPoolSize = 0;
- Object o = null;
- if ( ( o = ((ProcessingResourceMetaData) resource).getConfigurationParameterSettings().getParameterValue(AnalysisEngineController.CasPoolSize)) != null )
- {
- remoteCasPoolSize = ((Integer)o).intValue();
- System.out.println(">>>>>>>>>>>>>> Remote CAS Pool Size:::"+remoteCasPoolSize);
- if ( remoteCasPoolSize > endpoint.getShadowPoolSize() )
- {
- System.out.println(">>>>> Remote Cas Multiplier Cas Pool Size Exceeds the Size of the Local Cas Pool Size <<<<<<");
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_invalid_cp_size__CONFIG", new Object[] {getName(), fromDestination, remoteCasPoolSize, endpoint.getShadowPoolSize() });
- throw new ResourceConfigurationException(UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,"UIMAEE_invalid_cp_size__CONFIG", new Object[] {getName(), fromDestination, remoteCasPoolSize, endpoint.getShadowPoolSize()});
- }
- cmCasPoolSizeDelta = endpoint.getShadowPoolSize()-remoteCasPoolSize;
- }
- }
}
-
if ( endpoint.isRemote())
{
Object o = null;
- ServiceInfo remoteDelegateServiceInfo =
+ remoteDelegateServiceInfo =
getDelegateServiceInfo(key);
if ( remoteDelegateServiceInfo != null && remoteDelegateServiceInfo instanceof PrimitiveServiceInfo &&
( o = ((ProcessingResourceMetaData) resource).getConfigurationParameterSettings().getParameterValue(AnalysisEngineController.AEInstanceCount)) != null )
@@ -1432,7 +1730,6 @@
}
endpoint.setInitialized(true);
-
// If getMeta request not yet sent, send meta request to all remote delegate
// Special case when all delegates are remote is handled in the setInputChannel
@@ -1447,15 +1744,6 @@
unregisteredDelegateList.remove(i);
}
}
-/*
- // When all collocated delegates reply with metadata send request for meta to
- // remote delegates.
- if ( unregisteredDelegateList.size() == 0 )
- {
- requestForMetaSentToRemotes = true;
- sendRequestForMetadataToRemoteDelegates();
- }
- */
}
@@ -1472,6 +1760,10 @@
{
System.out.println("Setting Shadow Pool of Size:"+endpt.getShadowPoolSize()+" For Cas Multiplier:"+(String)remoteCasMultiplierList.get(i));
getCasManagerWrapper().initialize(endpt.getShadowPoolSize(),(String)remoteCasMultiplierList.get(i));
+ if ( remoteDelegateServiceInfo != null )
+ {
+ remoteDelegateServiceInfo.setCASMultiplier();
+ }
}
}
if ( !isStopped() )
@@ -1619,9 +1911,11 @@
pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
pServiceInfo.setState(serviceInfo.getState());
pServiceInfo.setAnalysisEngineInstanceCount(1);
-
ServicePerformance servicePerformance = new ServicePerformance();
-
+ if ( anEndpoint.isRemote() )
+ {
+ servicePerformance.setRemoteDelegate();
+ }
ServiceErrors serviceErrors = new ServiceErrors();
serviceErrorMap.put(key, serviceErrors);
@@ -1735,20 +2029,6 @@
}
}
-
- public boolean sendRequestToReleaseCas()
- {
-
- synchronized( counterMonitor )
- {
- if ( cmCasPoolSizeDelta > 0 && counter < cmCasPoolSizeDelta )
- {
- counter++;
- return true;
- }
- return false;
- }
- }
public ServiceErrors getServiceErrors(String aDelegateKey)
{
if ( !serviceErrorMap.containsKey(aDelegateKey ))
@@ -1761,7 +2041,7 @@
{
if ( serviceInfo == null )
{
- serviceInfo = new AggregateServiceInfo();
+ serviceInfo = new AggregateServiceInfo(isCasMultiplier());
if ( getInputChannel() != null )
{
serviceInfo.setInputQueueName(getInputChannel().getName());
@@ -1860,22 +2140,8 @@
super.stop();
cleanUp();
}
-/*
- public ServicePerformance getCasStatistics( String aCasReferenceId )
+ protected List getChildControllerList()
{
- ServicePerformance casStats = null;
- if ( perCasStatistics.containsKey(aCasReferenceId) )
- {
- casStats = (ServicePerformance)perCasStatistics.get(aCasReferenceId);
- }
- else
- {
- casStats = new ServicePerformance();
- perCasStatistics.put( aCasReferenceId, casStats);
- System.out.println("########## AggregateController.getCasStatistics()-Controller:"+getComponentName()+" Creating New ServicePerformance Object for Cas:"+aCasReferenceId+" Map HashCode:"+perCasStatistics.hashCode()+" Map Size:"+perCasStatistics.size());
- }
- return casStats;
+ return childControllerList;
}
-*/
-
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -58,10 +58,6 @@
public InputChannel getInputChannel();
public InputChannel getInputChannel(String aQueueName );
-
- public long getIdleTime( String aKey );
-
- public void saveIdleTime( long snapshot, String aKey, boolean accumulate );
public void saveReplyTime( long snapshot, String aKey );
@@ -142,6 +138,8 @@
public void addTimeSnapshot( long snapshot, String aKey );
public ServicePerformance getServicePerformance();
+
+ public ServiceInfo getServiceInfo();
public long getTimeSnapshot( String aKey );
@@ -162,5 +160,29 @@
public void notifyListenersWithInitializationStatus(Exception e);
public ServicePerformance getCasStatistics( String aCasReferenceId );
+
+ public boolean isCasMultiplier();
+
+ public void releaseNextCas(String aCasReferenceId);
+
+// public long getTotalIdleTime();
+ public long getIdleTime();
+
+ // This is called every time a request comes
+ public void beginProcess(int msgType);
+
+ // This is called every time a request is completed
+ public void endProcess(int msgType);
+
+ // Returns the idle time between process CAS calls
+ public long getIdleTimeBetweenProcessCalls(int msgType);
+
+ public long getCpuTime();
+
+ public long getAnalysisTime();
+
+ public void incrementSerializationTime(long cpuTime);
+
+ public void incrementDeserializationTime(long cpuTime);
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java Fri Aug 22 11:53:05 2008
@@ -61,7 +61,7 @@
*/
public void checkin(AnalysisEngine anAnalysisEngine) throws Exception
{
- //
+ aeInstanceMap.put(Thread.currentThread().getName(), anAnalysisEngine);
}
/**
@@ -72,7 +72,7 @@
*
* @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkout()
**/
- public AnalysisEngine checkout() throws Exception
+ public synchronized AnalysisEngine checkout() throws Exception
{
AnalysisEngine ae = null;
@@ -95,13 +95,24 @@
}
}
// ae may have been assigned above already, no need to fetch it again
+
+ /*
if ( ae == null )
{
// Fetch ae instance from the map using thread name as key. This mechanism assures that a thread
// uses the same ae instance every time.
ae = (AnalysisEngine)aeInstanceMap.get(Thread.currentThread().getName()) ;
}
+
return ae;
+ */
+
+ if ( aeInstanceMap.containsKey(Thread.currentThread().getName()) )
+ {
+ return (AnalysisEngine)aeInstanceMap.remove(Thread.currentThread().getName()) ;
+ }
+ else
+ return null;
}
/* (non-Javadoc)