You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by jo...@apache.org on 2009/07/02 10:42:16 UTC
svn commit: r790499 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java
Author: joern
Date: Thu Jul 2 08:42:15 2009
New Revision: 790499
URL: http://svn.apache.org/viewvc?rev=790499&view=rev
Log:
UIMA-1409
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java?rev=790499&r1=790498&r2=790499&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java Thu Jul 2 08:42:15 2009
@@ -53,12 +53,20 @@
AsyncCallback callback;
}
- // TODO: ups need thread safety here
- class UimaStatusCallbackListener extends UimaAsBaseCallbackListener {
+ private static class UimaStatusCallbackListener extends UimaAsBaseCallbackListener {
+ /**
+ * Shared intermediate map instance. Its used to map the cas reference id
+ * to the input <code>Exchange</code>.
+ */
+ private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;
+
+ private UimaStatusCallbackListener(Map<String, ExchangeAsyncCallbackPair> intermediateMap) {
+ this.intermediateMap = intermediateMap;
+ }
+
public void initializationComplete(EntityProcessStatus aStatus) {
- // TODO:
- // Log error status
+
}
public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
@@ -93,7 +101,7 @@
* The intermediate map keeps all {@link Exchange}s and their callbacks until asynchronous
* processing is finished.
*/
- private Map<String, ExchangeAsyncCallbackPair> intermediateMap;
+ private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;
public UimaAsProducer(String brokerAddress, String queue, Endpoint<Exchange> endpoint)
throws Exception {
@@ -103,7 +111,7 @@
uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
- uimaAsEngine.addStatusCallbackListener(new UimaStatusCallbackListener());
+ uimaAsEngine.addStatusCallbackListener(new UimaStatusCallbackListener(intermediateMap));
Map<String, Object> appCtx = new HashMap<String, Object>();
appCtx.put(UimaAsynchronousEngine.ServerUri, brokerAddress);
@@ -117,6 +125,9 @@
}
}
+ /**
+ * Not implemented, since the producer implements the AsyncProcessor interface.
+ */
public void process(Exchange exchange) throws Exception {
}
@@ -130,8 +141,23 @@
CAS cas = uimaAsEngine.getCAS();
cas.setDocumentText(rowId);
+
+ // The intermediate map must be locked for sending
+ // a CAS to the service to guarantee that the reference
+ // id for the CAS is inserted into the intermediate map
+ // before the call back listener tries to map
+ // the just returned reference id to an exchange
+
+ synchronized (intermediateMap) {
+
+ refernceId = uimaAsEngine.sendCAS(cas);
+
+ ExchangeAsyncCallbackPair exchangeCallback = new ExchangeAsyncCallbackPair();
+ exchangeCallback.exchange = exchange;
+ exchangeCallback.callback = callback;
- refernceId = uimaAsEngine.sendCAS(cas);
+ intermediateMap.put(refernceId, exchangeCallback);
+ }
} catch (Exception e) {
// Processing of the exchange failed
@@ -143,13 +169,7 @@
callback.done(true);
return true;
}
-
- ExchangeAsyncCallbackPair exchangeCallback = new ExchangeAsyncCallbackPair();
- exchangeCallback.exchange = exchange;
- exchangeCallback.callback = callback;
-
- intermediateMap.put(refernceId, exchangeCallback);
-
+
return false;
}
}