You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by jo...@apache.org on 2009/07/02 10:42:16 UTC

svn commit: r790499 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java

Author: joern
Date: Thu Jul  2 08:42:15 2009
New Revision: 790499

URL: http://svn.apache.org/viewvc?rev=790499&view=rev
Log:
UIMA-1409

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java?rev=790499&r1=790498&r2=790499&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-camel/src/main/java/org/apache/uima/camel/UimaAsProducer.java Thu Jul  2 08:42:15 2009
@@ -53,12 +53,20 @@
     AsyncCallback callback;
   }
 
-  // TODO: ups need thread safety here
-  class UimaStatusCallbackListener extends UimaAsBaseCallbackListener {
+  private static class UimaStatusCallbackListener extends UimaAsBaseCallbackListener {
 
+    /**
+     * Shared intermediate map instance. Its used to map the cas reference id
+     * to the input <code>Exchange</code>.
+     */
+    private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;
+    
+    private UimaStatusCallbackListener(Map<String, ExchangeAsyncCallbackPair> intermediateMap) {
+      this.intermediateMap = intermediateMap;
+    }
+    
     public void initializationComplete(EntityProcessStatus aStatus) {
-      // TODO:
-      // Log error status
+
     }
 
     public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
@@ -93,7 +101,7 @@
    * The intermediate map keeps all {@link Exchange}s and their callbacks until asynchronous
    * processing is finished.
    */
-  private Map<String, ExchangeAsyncCallbackPair> intermediateMap;
+  private final Map<String, ExchangeAsyncCallbackPair> intermediateMap;
 
   public UimaAsProducer(String brokerAddress, String queue, Endpoint<Exchange> endpoint)
           throws Exception {
@@ -103,7 +111,7 @@
 
     uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
 
-    uimaAsEngine.addStatusCallbackListener(new UimaStatusCallbackListener());
+    uimaAsEngine.addStatusCallbackListener(new UimaStatusCallbackListener(intermediateMap));
 
     Map<String, Object> appCtx = new HashMap<String, Object>();
     appCtx.put(UimaAsynchronousEngine.ServerUri, brokerAddress);
@@ -117,6 +125,9 @@
     }
   }
 
+  /**
+   * Not implemented, since the producer implements the AsyncProcessor interface.
+   */
   public void process(Exchange exchange) throws Exception {
   }
 
@@ -130,8 +141,23 @@
       CAS cas = uimaAsEngine.getCAS();
 
       cas.setDocumentText(rowId);
+      
+      // The intermediate map must be locked for sending
+      // a CAS to the service to guarantee that the reference
+      // id for the CAS is inserted into the intermediate map
+      // before the call back listener tries to map
+      // the just returned reference id to an exchange
+      
+      synchronized (intermediateMap) {
+        
+        refernceId = uimaAsEngine.sendCAS(cas);
+      
+        ExchangeAsyncCallbackPair exchangeCallback = new ExchangeAsyncCallbackPair();
+        exchangeCallback.exchange = exchange;
+        exchangeCallback.callback = callback;
 
-      refernceId = uimaAsEngine.sendCAS(cas);
+        intermediateMap.put(refernceId, exchangeCallback);
+      }
 
     } catch (Exception e) {
       // Processing of the exchange failed
@@ -143,13 +169,7 @@
       callback.done(true);
       return true;
     }
-
-    ExchangeAsyncCallbackPair exchangeCallback = new ExchangeAsyncCallbackPair();
-    exchangeCallback.exchange = exchange;
-    exchangeCallback.callback = callback;
-
-    intermediateMap.put(refernceId, exchangeCallback);
-
+    
     return false;
   }
 }