You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@uima.apache.org by Dietmar Gräbner <d....@gmail.com> on 2010/12/07 15:50:04 UTC

UimaAS blocks when accessing a queue with multiple clients concurrently

Hi,

I hope you can help me with a problem I am struggling with for quite a
while now.

I wrote a test client creating multiple threads. Each thread
instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
aggregate with the sendAndReceiveCAS() call. When running the program
with e.g. 100 Threads the client gets stuck after processing X calls.

Environment:
-client:
- - uses the uima  2.3.0-incubating
- - the client is a modified copy of the RunRemoteAE.java uima
provides as an example
- - no timeouts configured except MetaTimeout

-server:
- - server runs on a different machine
- - 2.3.0 with JMX configured to monitor activeMq and the Uima Services
- - Deployment Descriptor
- - - the uima service has two delegates: WhitespaceTokenizer and a
SentenceAnnotator
- - - no extra error handling configured (see at the end of the email)



Detailed problem description:

- The UIMA service reports in the JMX stats that all documents have
been processed. The individual delegate logs report that the process
method has been finished.
- Some of the client threads are blocked at the
AbstractQueuedSynchronizer. The others finished sucessfully.
- The Temp Queues in Active MQ still exist. Some are containing
messages and there are differences in the enqueue and dequeue counts.
- I experimented with different configuration parameters in the AS
deploment descriptor, though I don't think that this is the problem.
- Everything works fine with a primitve UIMA Service.
- I set the log settings to ALL, but couldn't find any Exceptions.


Thank you in advance.

Best regards,

Dietmar


-----
the most simple descriptor I used (I also tried 1 instance per used thread):

<analysisEngineDeploymentDescription
  xmlns="http://uima.apache.org/resourceSpecifier">
  <name>SentenceAnnotator</name>
  <description>Deploys SentenceAnnotator AE</description>
  <deployment protocol="jms" provider="activemq" >
    <service>
      <inputQueue endpoint="SentenceAnnotatorQueue"
brokerURL="${defaultBrokerURL}"/>
      <topDescriptor>
         <import location="AAE_WSTokenizerSentenceAnnotator.xml"/>
      </topDescriptor>
      <analysisEngine async="true" key="SentenceAnnotator"
internalReplyQueueScaleout="1" inputQueueScaleout="1">
              <delegates>
                <analysisEngine key="SentenceAnnotator">
                  <scaleout numberOfInstances="1"/>
                </analysisEngine>
                <analysisEngine key ="WhitespaceTokenizer">
                  <scaleout numberOfInstances="1"/>
                </analysisEngine>
              </delegates>
      </analysisEngine>
    </service>
  </deployment>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
thx for the example - I'll test it tomorrow.

Best regards

On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> For some reason attachment dont seem to work. Here is my code:
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *   http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  * KIND, either express or implied.  See the License for the
>  * specific language governing permissions and limitations
>  * under the License.
>  */
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
>  * Example client application that can instantiate multiple UIMA AS clients
> each running in
>  * a separate thread.
>  * <p>
>  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
>  * <p>
>  * The application creates as many UIMA AS clients and threads as specified
> in the "scaleup"
>  * argument. Each instance runs in its own thread and has is its own temp
> reply queue. It
>  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> service. For this
>  * a CAS Pool containing a single CAS is sufficient.
>  * <p>
>  * Each client sends as many CASes to a remote service as specified in the
> "howManyCASesToSend"
>  * argument.
>  * <p>
>  * The application initializes a CountDownLatch to the number of
> clients/threads which is than
>  * used to await completion. When a worker thread completes its run, it
> sends a CPC and counts down the
>  * latch reducing the number of busy threads. When all threads finish, the
> application is notified
>  * and can proceed to cleanup and shutdown.
>  *
>  */
> public class MultithreadedClientApp {
> public CountDownLatch latch = null;
> public void initializeAndRun(String[] args) throws Exception {
> try {
> int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
> latch = new CountDownLatch(howManyWorkers);  // each worker counts down when
> done
> // Create Worker threads
> ClientWorker[] workers = new ClientWorker[howManyWorkers];
> final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
> howManyWorkers);
> // Thread Pool Executor to manages threads
> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> howManyWorkers,
>                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> // Start all threads
> threadPool.prestartAllCoreThreads();
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i] = new ClientWorker();
> // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> }
>
> for( int i=0; i < howManyWorkers; i++ ) {
> threadPool.submit(workers[i]);  // start the workers
> }
> // Each worker counts down the latch after it is done sending CASes
> latch.await();
>  // All worker threads completed, now stop the clients
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i].stop();  // stop UIMA AS clients
> }
>
> threadPool.shutdown();  // cleanup thread pool
>  System.out.println("All UIMA AS Clients Finished Processing");
> } catch( Exception e ) {
> e.printStackTrace();
> }
>  }
> public static void main(String[] args) {
> MultithreadedClientApp client = new MultithreadedClientApp();
> try {
> if ( args.length != 4 ) {
> System.out.println("Usage: ");
> }
> client.initializeAndRun(args);
> } catch( Exception e ) {
> e.printStackTrace();
> }
> }
> public class ClientWorker implements Runnable {
> private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> private int howManyCASes = 0;
> public void initialize(String brokerUrl, String endpoint, int howManyCASes )
> throws Exception {
>  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> Map<String, Object> appCtx = new HashMap<String, Object>();
>    // set server URI and Endpoint
>    // Add Broker URI
>    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
>    // Add Queue Name
>    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
>    // Add the Cas Pool Size and initial FS heap size
>    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>
>    // initialize
>    uimaASClient.initialize(appCtx);
>    this.howManyCASes = howManyCASes;
> }
> public void stop() {
> uimaASClient.stop();
> }
> public void run() {
> try {
>    int sentSoFar = 0;
>             CAS cas = uimaASClient.getCAS();
>             int count=1;
>     while( sentSoFar < howManyCASes ) {
>
>          cas.setDocumentText("Some Text");
>
>          uimaASClient.sendAndReceiveCAS(cas);
>          System.out.println("Thread:"+Thread.currentThread().getId()+":::
> Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
>          cas.reset();
>          sentSoFar++;
>     }
>     uimaASClient.collectionProcessingComplete();
>     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> CPC. Thread Done");
>     latch.countDown();
> } catch( Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
>
>
>
>
> On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
>
>> Dietmar, I tried my example application with an Aggregate Service and see
>> no problem. Your previous email had no source attached.
>> Attached please find an example application code I use in my testing. To
>> run it"
>>
>> java -cp <classpath> MultithreadedClientApp
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>>
>> The code adds a short text to each CAS before each call to
>> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>>
>> Jerry
>>
>> 2010/12/9 Dietmar Gräbner <d....@gmail.com>
>>
>> Hi Eddie,
>>>
>>> wouldn't the client requests be serialized in the szenario you propose?
>>>
>>> Dietmar
>>>
>>>
>>>
>>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com>
>>> wrote:
>>> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>>> >> I wrote a test client creating multiple threads. Each thread
>>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>>> >
>>> > FWIW, a similar multithreaded client scenario that has been used with
>>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>>> > using the common API object.
>>> >
>>> > Eddie
>>> >
>>>
>>
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
Hi,

I checked the logs again - I got no Exceptions. Just tried it with 10
Threads with the same result. Did you use the modified deployment
descriptor in your tests?
The hang only occurs with the <analysisEngine>- Tag defined in the
deployment descriptor.

Below is the sample descriptor of the Meeting Detector Aggregate:

thx

Dietmar


<analysisEngineDeploymentDescription
  xmlns="http://uima.apache.org/resourceSpecifier">

  <name>Meeting Detector TAE</name>
  <description>Deploys Meeting Detector Aggregate AE with all its
delegates in the same JVM.</description>

  <deployment protocol="jms" provider="activemq">
    <service>
      <inputQueue endpoint="MeetingDetectorTaeQueue"
brokerURL="tcp://url:61616"/>
      <topDescriptor>
       <import location="../../descriptors/tutorial/ex4/MeetingDetectorTAE.xml"/>
      </topDescriptor>
      <analysisEngine key="MeetingDetectorTae">
              <delegates>
                <analysisEngine key="RoomNumber">
                </analysisEngine>
                <analysisEngine key ="DateTime">
                </analysisEngine>
                <analysisEngine key ="Meeting">
                </analysisEngine>
              </delegates>
      </analysisEngine>
    </service>
  </deployment>

</analysisEngineDeploymentDescription>

On Fri, Dec 10, 2010 at 5:09 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> Dietmar, I ran your code and so far I dont see a hang. I ran this code
> multiple times on my 2-Core Thinkpad. Perhaps if you run this
> on a machine with more CPUs the timing changes and causes a race condition
> that results in a hang. I just dont see it.
>
> I *am* getting a hang if I dont provide enough memory to the process.  I
> tried to run your code with 200 threads
> and 600M memory and I got OOM and eventual hang. There were a lot of
> exceptions though which you say you dont see
> when you run. I've noticed that your code uses log4j, perhaps the exceptions
> are in the log?
>
> JC
>
> 2010/12/10 Dietmar Gräbner <d....@gmail.com>
>
>> Hi,
>>
>> I tested your example and worked for both configurations
>> (MeetingDetectorTae with and without the <analysisEngine> part. The
>> main difference between your client and mine is the separation of the
>> initialize and the submit process in your example.
>> Here is my code (Main class and Worker Thread):
>>
>> The main class:
>>
>>
>> import org.apache.log4j.Logger;
>> import org.apache.log4j.xml.DOMConfigurator;
>>
>> /**
>>  *
>>  * A multithreaded testClient calling a worker.
>>  *
>>  */
>> public class MultithreadedTestClient {
>>
>>    //a server timeout not used in the current example
>>    public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
>>    public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads
>>
>>
>>    public static Logger theLog =
>> Logger.getLogger(MultithreadedTestClient.class);
>>    /**
>>     * Main method for the test
>>     *
>>     * @param args no arguments are parsed
>>     * @throws Exception some problem
>>     */
>>    public static void main(java.lang.String args[]) throws Exception {
>>
>>        // specify logfile settings
>>        if (System.getProperty("log4j.configuration")==null){
>>            DOMConfigurator.configure("resources/config/log4j.xml");
>>        }
>>
>>        for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
>>            Runnable r = new MinimalWorkerThread(i);
>>            new Thread(r).start();
>>        }
>>    }
>>
>> }
>>
>> And the Thread Worker:
>>
>> import java.util.HashMap;
>> import java.util.Map;
>>
>> import org.apache.uima.aae.client.UimaAsynchronousEngine;
>> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
>> import org.apache.uima.cas.CAS;
>>
>> /**
>>  * a runnable initializing and calling the Webservice
>>  */
>> public class MinimalWorkerThread implements Runnable {
>>
>>    private int mId = 0;
>>    /**
>>     * Start time of the processing - used to compute elapsed time.
>>     */
>>    private UimaAsynchronousEngine uimaEEEngine = null;
>>     Map<String, Object> appCtx = new HashMap<String, Object>();
>>
>>     /**
>>     * Constructor for the class. Parses command line arguments and
>> sets the values of fields in this
>>     * instance. If command line is invalid prints a message and calls
>> System.exit().
>>     *
>>     * @param args
>>     *          command line arguments into the program - see class
>> description
>>     */
>>    public MinimalWorkerThread(int id) throws Exception {
>>        mId = id;
>>        //Initzialize the AppContext
>>        uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
>>         // Add Broker URI
>>        appCtx.put(UimaAsynchronousEngine.ServerUri,
>> "tcp://143.205.174.93:61616");
>>        // Add Endpoint
>>        appCtx.put(UimaAsynchronousEngine.Endpoint,
>> "MeetingDetectorTaeQueue");
>>         appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>>    }
>>
>>     public void run() {
>>
>>        try {
>>            System.out.println("running " + mId);
>>            //initialize the client
>>            uimaEEEngine.initialize(appCtx);
>>            String text = "Id " + mId +"This is a nice test sentence.
>> And a second. Including a third.";
>>            // send an empty CAS
>>            CAS cas = uimaEEEngine.getCAS();
>>            //cas.setDocumentLanguage("en");
>>            cas.setDocumentText(text);
>>            uimaEEEngine.sendAndReceiveCAS(cas);
>>            uimaEEEngine.collectionProcessingComplete();
>>            System.out.println("Thread id " + mId + " returned " +
>> cas.getDocumentText().substring(0, 5));
>>            cas.reset();
>>            uimaEEEngine.stop();
>>        } catch (Exception e) {
>>            System.err.println("Exception during Processing!");
>>            e.printStackTrace();
>>         }
>>    }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
>> > For some reason attachment dont seem to work. Here is my code:
>> >
>> > /*
>> >  * Licensed to the Apache Software Foundation (ASF) under one
>> >  * or more contributor license agreements.  See the NOTICE file
>> >  * distributed with this work for additional information
>> >  * regarding copyright ownership.  The ASF licenses this file
>> >  * to you under the Apache License, Version 2.0 (the
>> >  * "License"); you may not use this file except in compliance
>> >  * with the License.  You may obtain a copy of the License at
>> >  *
>> >  *   http://www.apache.org/licenses/LICENSE-2.0
>> >  *
>> >  * Unless required by applicable law or agreed to in writing,
>> >  * software distributed under the License is distributed on an
>> >  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> >  * KIND, either express or implied.  See the License for the
>> >  * specific language governing permissions and limitations
>> >  * under the License.
>> >  */
>> >
>> >
>> > import java.util.HashMap;
>> > import java.util.Map;
>> > import java.util.concurrent.ArrayBlockingQueue;
>> > import java.util.concurrent.CountDownLatch;
>> > import java.util.concurrent.ThreadPoolExecutor;
>> > import java.util.concurrent.TimeUnit;
>> >
>> > import org.apache.uima.aae.client.UimaAsynchronousEngine;
>> > import
>> org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
>> > import org.apache.uima.cas.CAS;
>> >
>> > /**
>> >  * Example client application that can instantiate multiple UIMA AS
>> clients
>> > each running in
>> >  * a separate thread.
>> >  * <p>
>> >  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
>> >  * <p>
>> >  * The application creates as many UIMA AS clients and threads as
>> specified
>> > in the "scaleup"
>> >  * argument. Each instance runs in its own thread and has is its own temp
>> > reply queue. It
>> >  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
>> > service. For this
>> >  * a CAS Pool containing a single CAS is sufficient.
>> >  * <p>
>> >  * Each client sends as many CASes to a remote service as specified in
>> the
>> > "howManyCASesToSend"
>> >  * argument.
>> >  * <p>
>> >  * The application initializes a CountDownLatch to the number of
>> > clients/threads which is than
>> >  * used to await completion. When a worker thread completes its run, it
>> > sends a CPC and counts down the
>> >  * latch reducing the number of busy threads. When all threads finish,
>> the
>> > application is notified
>> >  * and can proceed to cleanup and shutdown.
>> >  *
>> >  */
>> > public class MultithreadedClientApp {
>> > public CountDownLatch latch = null;
>> > public void initializeAndRun(String[] args) throws Exception {
>> > try {
>> > int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
>> > latch = new CountDownLatch(howManyWorkers);  // each worker counts down
>> when
>> > done
>> > // Create Worker threads
>> > ClientWorker[] workers = new ClientWorker[howManyWorkers];
>> > final ArrayBlockingQueue<Runnable> queue = new
>> ArrayBlockingQueue<Runnable>(
>> > howManyWorkers);
>> > // Thread Pool Executor to manages threads
>> > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
>> > howManyWorkers,
>> >                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
>> > // Start all threads
>> > threadPool.prestartAllCoreThreads();
>> > for( int i=0; i < howManyWorkers; i++ ) {
>> > workers[i] = new ClientWorker();
>> > // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
>> > workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
>> > }
>> >
>> > for( int i=0; i < howManyWorkers; i++ ) {
>> > threadPool.submit(workers[i]);  // start the workers
>> > }
>> > // Each worker counts down the latch after it is done sending CASes
>> > latch.await();
>> >  // All worker threads completed, now stop the clients
>> > for( int i=0; i < howManyWorkers; i++ ) {
>> > workers[i].stop();  // stop UIMA AS clients
>> > }
>> >
>> > threadPool.shutdown();  // cleanup thread pool
>> >  System.out.println("All UIMA AS Clients Finished Processing");
>> > } catch( Exception e ) {
>> > e.printStackTrace();
>> > }
>> >  }
>> > public static void main(String[] args) {
>> > MultithreadedClientApp client = new MultithreadedClientApp();
>> > try {
>> > if ( args.length != 4 ) {
>> > System.out.println("Usage: ");
>> > }
>> > client.initializeAndRun(args);
>> > } catch( Exception e ) {
>> > e.printStackTrace();
>> > }
>> > }
>> > public class ClientWorker implements Runnable {
>> > private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
>> > private int howManyCASes = 0;
>> > public void initialize(String brokerUrl, String endpoint, int
>> howManyCASes )
>> > throws Exception {
>> >  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
>> > Map<String, Object> appCtx = new HashMap<String, Object>();
>> >    // set server URI and Endpoint
>> >    // Add Broker URI
>> >    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
>> >    // Add Queue Name
>> >    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
>> >    // Add the Cas Pool Size and initial FS heap size
>> >    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>> >
>> >    // initialize
>> >    uimaASClient.initialize(appCtx);
>> >    this.howManyCASes = howManyCASes;
>> > }
>> > public void stop() {
>> > uimaASClient.stop();
>> > }
>> > public void run() {
>> > try {
>> >    int sentSoFar = 0;
>> >             CAS cas = uimaASClient.getCAS();
>> >             int count=1;
>> >     while( sentSoFar < howManyCASes ) {
>> >
>> >          cas.setDocumentText("Some Text");
>> >
>> >          uimaASClient.sendAndReceiveCAS(cas);
>> >          System.out.println("Thread:"+Thread.currentThread().getId()+":::
>> > Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
>> >          cas.reset();
>> >          sentSoFar++;
>> >     }
>> >     uimaASClient.collectionProcessingComplete();
>> >     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
>> > CPC. Thread Done");
>> >     latch.countDown();
>> > } catch( Exception e) {
>> > e.printStackTrace();
>> > }
>> > }
>> > }
>> > }
>> >
>> >
>> >
>> >
>> > On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <ui...@gmail.com>
>> wrote:
>> >
>> >> Dietmar, I tried my example application with an Aggregate Service and
>> see
>> >> no problem. Your previous email had no source attached.
>> >> Attached please find an example application code I use in my testing. To
>> >> run it"
>> >>
>> >> java -cp <classpath> MultithreadedClientApp
>> >>
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>> >>
>> >> The code adds a short text to each CAS before each call to
>> >> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>> >>
>> >> Jerry
>> >>
>> >> 2010/12/9 Dietmar Gräbner <d....@gmail.com>
>> >>
>> >> Hi Eddie,
>> >>>
>> >>> wouldn't the client requests be serialized in the szenario you propose?
>> >>>
>> >>> Dietmar
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com>
>> >>> wrote:
>> >>> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>> >>> >> I wrote a test client creating multiple threads. Each thread
>> >>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> >>> >> aggregate with the sendAndReceiveCAS() call. When running the
>> program
>> >>> >> with e.g. 100 Threads the client gets stuck after processing X
>> calls.
>> >>> >
>> >>> > FWIW, a similar multithreaded client scenario that has been used with
>> >>> > no problems is to instantiate a single
>> BaseUIMAAsynchronousEngine_impl
>> >>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>> >>> > using the common API object.
>> >>> >
>> >>> > Eddie
>> >>> >
>> >>>
>> >>
>> >>
>> >
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Jaroslaw Cwiklik <ui...@gmail.com>.
Dietmar, I ran your code and so far I dont see a hang. I ran this code
multiple times on my 2-Core Thinkpad. Perhaps if you run this
on a machine with more CPUs the timing changes and causes a race condition
that results in a hang. I just dont see it.

I *am* getting a hang if I dont provide enough memory to the process.  I
tried to run your code with 200 threads
and 600M memory and I got OOM and eventual hang. There were a lot of
exceptions though which you say you dont see
when you run. I've noticed that your code uses log4j, perhaps the exceptions
are in the log?

JC

2010/12/10 Dietmar Gräbner <d....@gmail.com>

> Hi,
>
> I tested your example and worked for both configurations
> (MeetingDetectorTae with and without the <analysisEngine> part. The
> main difference between your client and mine is the separation of the
> initialize and the submit process in your example.
> Here is my code (Main class and Worker Thread):
>
> The main class:
>
>
> import org.apache.log4j.Logger;
> import org.apache.log4j.xml.DOMConfigurator;
>
> /**
>  *
>  * A multithreaded testClient calling a worker.
>  *
>  */
> public class MultithreadedTestClient {
>
>    //a server timeout not used in the current example
>    public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
>    public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads
>
>
>    public static Logger theLog =
> Logger.getLogger(MultithreadedTestClient.class);
>    /**
>     * Main method for the test
>     *
>     * @param args no arguments are parsed
>     * @throws Exception some problem
>     */
>    public static void main(java.lang.String args[]) throws Exception {
>
>        // specify logfile settings
>        if (System.getProperty("log4j.configuration")==null){
>            DOMConfigurator.configure("resources/config/log4j.xml");
>        }
>
>        for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
>            Runnable r = new MinimalWorkerThread(i);
>            new Thread(r).start();
>        }
>    }
>
> }
>
> And the Thread Worker:
>
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
>  * a runnable initializing and calling the Webservice
>  */
> public class MinimalWorkerThread implements Runnable {
>
>    private int mId = 0;
>    /**
>     * Start time of the processing - used to compute elapsed time.
>     */
>    private UimaAsynchronousEngine uimaEEEngine = null;
>     Map<String, Object> appCtx = new HashMap<String, Object>();
>
>     /**
>     * Constructor for the class. Parses command line arguments and
> sets the values of fields in this
>     * instance. If command line is invalid prints a message and calls
> System.exit().
>     *
>     * @param args
>     *          command line arguments into the program - see class
> description
>     */
>    public MinimalWorkerThread(int id) throws Exception {
>        mId = id;
>        //Initzialize the AppContext
>        uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
>         // Add Broker URI
>        appCtx.put(UimaAsynchronousEngine.ServerUri,
> "tcp://143.205.174.93:61616");
>        // Add Endpoint
>        appCtx.put(UimaAsynchronousEngine.Endpoint,
> "MeetingDetectorTaeQueue");
>         appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>    }
>
>     public void run() {
>
>        try {
>            System.out.println("running " + mId);
>            //initialize the client
>            uimaEEEngine.initialize(appCtx);
>            String text = "Id " + mId +"This is a nice test sentence.
> And a second. Including a third.";
>            // send an empty CAS
>            CAS cas = uimaEEEngine.getCAS();
>            //cas.setDocumentLanguage("en");
>            cas.setDocumentText(text);
>            uimaEEEngine.sendAndReceiveCAS(cas);
>            uimaEEEngine.collectionProcessingComplete();
>            System.out.println("Thread id " + mId + " returned " +
> cas.getDocumentText().substring(0, 5));
>            cas.reset();
>            uimaEEEngine.stop();
>        } catch (Exception e) {
>            System.err.println("Exception during Processing!");
>            e.printStackTrace();
>         }
>    }
> }
>
>
>
>
>
>
>
>
>
> On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> > For some reason attachment dont seem to work. Here is my code:
> >
> > /*
> >  * Licensed to the Apache Software Foundation (ASF) under one
> >  * or more contributor license agreements.  See the NOTICE file
> >  * distributed with this work for additional information
> >  * regarding copyright ownership.  The ASF licenses this file
> >  * to you under the Apache License, Version 2.0 (the
> >  * "License"); you may not use this file except in compliance
> >  * with the License.  You may obtain a copy of the License at
> >  *
> >  *   http://www.apache.org/licenses/LICENSE-2.0
> >  *
> >  * Unless required by applicable law or agreed to in writing,
> >  * software distributed under the License is distributed on an
> >  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >  * KIND, either express or implied.  See the License for the
> >  * specific language governing permissions and limitations
> >  * under the License.
> >  */
> >
> >
> > import java.util.HashMap;
> > import java.util.Map;
> > import java.util.concurrent.ArrayBlockingQueue;
> > import java.util.concurrent.CountDownLatch;
> > import java.util.concurrent.ThreadPoolExecutor;
> > import java.util.concurrent.TimeUnit;
> >
> > import org.apache.uima.aae.client.UimaAsynchronousEngine;
> > import
> org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> > import org.apache.uima.cas.CAS;
> >
> > /**
> >  * Example client application that can instantiate multiple UIMA AS
> clients
> > each running in
> >  * a separate thread.
> >  * <p>
> >  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
> >  * <p>
> >  * The application creates as many UIMA AS clients and threads as
> specified
> > in the "scaleup"
> >  * argument. Each instance runs in its own thread and has is its own temp
> > reply queue. It
> >  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> > service. For this
> >  * a CAS Pool containing a single CAS is sufficient.
> >  * <p>
> >  * Each client sends as many CASes to a remote service as specified in
> the
> > "howManyCASesToSend"
> >  * argument.
> >  * <p>
> >  * The application initializes a CountDownLatch to the number of
> > clients/threads which is than
> >  * used to await completion. When a worker thread completes its run, it
> > sends a CPC and counts down the
> >  * latch reducing the number of busy threads. When all threads finish,
> the
> > application is notified
> >  * and can proceed to cleanup and shutdown.
> >  *
> >  */
> > public class MultithreadedClientApp {
> > public CountDownLatch latch = null;
> > public void initializeAndRun(String[] args) throws Exception {
> > try {
> > int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
> > latch = new CountDownLatch(howManyWorkers);  // each worker counts down
> when
> > done
> > // Create Worker threads
> > ClientWorker[] workers = new ClientWorker[howManyWorkers];
> > final ArrayBlockingQueue<Runnable> queue = new
> ArrayBlockingQueue<Runnable>(
> > howManyWorkers);
> > // Thread Pool Executor to manages threads
> > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> > howManyWorkers,
> >                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> > // Start all threads
> > threadPool.prestartAllCoreThreads();
> > for( int i=0; i < howManyWorkers; i++ ) {
> > workers[i] = new ClientWorker();
> > // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> > workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> > }
> >
> > for( int i=0; i < howManyWorkers; i++ ) {
> > threadPool.submit(workers[i]);  // start the workers
> > }
> > // Each worker counts down the latch after it is done sending CASes
> > latch.await();
> >  // All worker threads completed, now stop the clients
> > for( int i=0; i < howManyWorkers; i++ ) {
> > workers[i].stop();  // stop UIMA AS clients
> > }
> >
> > threadPool.shutdown();  // cleanup thread pool
> >  System.out.println("All UIMA AS Clients Finished Processing");
> > } catch( Exception e ) {
> > e.printStackTrace();
> > }
> >  }
> > public static void main(String[] args) {
> > MultithreadedClientApp client = new MultithreadedClientApp();
> > try {
> > if ( args.length != 4 ) {
> > System.out.println("Usage: ");
> > }
> > client.initializeAndRun(args);
> > } catch( Exception e ) {
> > e.printStackTrace();
> > }
> > }
> > public class ClientWorker implements Runnable {
> > private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> > private int howManyCASes = 0;
> > public void initialize(String brokerUrl, String endpoint, int
> howManyCASes )
> > throws Exception {
> >  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> > Map<String, Object> appCtx = new HashMap<String, Object>();
> >    // set server URI and Endpoint
> >    // Add Broker URI
> >    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
> >    // Add Queue Name
> >    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
> >    // Add the Cas Pool Size and initial FS heap size
> >    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
> >
> >    // initialize
> >    uimaASClient.initialize(appCtx);
> >    this.howManyCASes = howManyCASes;
> > }
> > public void stop() {
> > uimaASClient.stop();
> > }
> > public void run() {
> > try {
> >    int sentSoFar = 0;
> >             CAS cas = uimaASClient.getCAS();
> >             int count=1;
> >     while( sentSoFar < howManyCASes ) {
> >
> >          cas.setDocumentText("Some Text");
> >
> >          uimaASClient.sendAndReceiveCAS(cas);
> >          System.out.println("Thread:"+Thread.currentThread().getId()+":::
> > Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
> >          cas.reset();
> >          sentSoFar++;
> >     }
> >     uimaASClient.collectionProcessingComplete();
> >     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> > CPC. Thread Done");
> >     latch.countDown();
> > } catch( Exception e) {
> > e.printStackTrace();
> > }
> > }
> > }
> > }
> >
> >
> >
> >
> > On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <ui...@gmail.com>
> wrote:
> >
> >> Dietmar, I tried my example application with an Aggregate Service and
> see
> >> no problem. Your previous email had no source attached.
> >> Attached please find an example application code I use in my testing. To
> >> run it"
> >>
> >> java -cp <classpath> MultithreadedClientApp
> >>
> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
> >>
> >> The code adds a short text to each CAS before each call to
> >> sendAndReceive(). There are no app listeners attached to UIMA AS client.
> >>
> >> Jerry
> >>
> >> 2010/12/9 Dietmar Gräbner <d....@gmail.com>
> >>
> >> Hi Eddie,
> >>>
> >>> wouldn't the client requests be serialized in the szenario you propose?
> >>>
> >>> Dietmar
> >>>
> >>>
> >>>
> >>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com>
> >>> wrote:
> >>> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
> >>> >> I wrote a test client creating multiple threads. Each thread
> >>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> >>> >> aggregate with the sendAndReceiveCAS() call. When running the
> program
> >>> >> with e.g. 100 Threads the client gets stuck after processing X
> calls.
> >>> >
> >>> > FWIW, a similar multithreaded client scenario that has been used with
> >>> > no problems is to instantiate a single
> BaseUIMAAsynchronousEngine_impl
> >>> > with big enough casPool and have each thread call sendAndReceiveCAS()
> >>> > using the common API object.
> >>> >
> >>> > Eddie
> >>> >
> >>>
> >>
> >>
> >
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
Hi,

I tested your example and worked for both configurations
(MeetingDetectorTae with and without the <analysisEngine> part. The
main difference between your client and mine is the separation of the
initialize and the submit process in your example.
Here is my code (Main class and Worker Thread):

The main class:


import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;

/**
 *
 * A multithreaded testClient calling a worker.
 *
 */
public class MultithreadedTestClient {

    //a server timeout not used in the current example
    public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
    public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads


    public static Logger theLog =
Logger.getLogger(MultithreadedTestClient.class);
    /**
     * Main method for the test
     *
     * @param args no arguments are parsed
     * @throws Exception some problem
     */
    public static void main(java.lang.String args[]) throws Exception {

        // specify logfile settings
        if (System.getProperty("log4j.configuration")==null){
            DOMConfigurator.configure("resources/config/log4j.xml");
        }

        for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
            Runnable r = new MinimalWorkerThread(i);
            new Thread(r).start();
        }
    }

}

And the Thread Worker:

import java.util.HashMap;
import java.util.Map;

import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;

/**
 * a runnable initializing and calling the Webservice
 */
public class MinimalWorkerThread implements Runnable {

    private int mId = 0;
    /**
     * Start time of the processing - used to compute elapsed time.
     */
    private UimaAsynchronousEngine uimaEEEngine = null;
    Map<String, Object> appCtx = new HashMap<String, Object>();

    /**
     * Constructor for the class. Parses command line arguments and
sets the values of fields in this
     * instance. If command line is invalid prints a message and calls
System.exit().
     *
     * @param args
     *          command line arguments into the program - see class description
     */
    public MinimalWorkerThread(int id) throws Exception {
        mId = id;
        //Initzialize the AppContext
        uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
        // Add Broker URI
        appCtx.put(UimaAsynchronousEngine.ServerUri,
"tcp://143.205.174.93:61616");
        // Add Endpoint
        appCtx.put(UimaAsynchronousEngine.Endpoint, "MeetingDetectorTaeQueue");
        appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
    }

    public void run() {

        try {
            System.out.println("running " + mId);
            //initialize the client
            uimaEEEngine.initialize(appCtx);
            String text = "Id " + mId +"This is a nice test sentence.
And a second. Including a third.";
            // send an empty CAS
            CAS cas = uimaEEEngine.getCAS();
            //cas.setDocumentLanguage("en");
            cas.setDocumentText(text);
            uimaEEEngine.sendAndReceiveCAS(cas);
            uimaEEEngine.collectionProcessingComplete();
            System.out.println("Thread id " + mId + " returned " +
cas.getDocumentText().substring(0, 5));
            cas.reset();
            uimaEEEngine.stop();
        } catch (Exception e) {
            System.err.println("Exception during Processing!");
            e.printStackTrace();
        }
    }
}









On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> For some reason attachment dont seem to work. Here is my code:
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *   http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  * KIND, either express or implied.  See the License for the
>  * specific language governing permissions and limitations
>  * under the License.
>  */
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
>  * Example client application that can instantiate multiple UIMA AS clients
> each running in
>  * a separate thread.
>  * <p>
>  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
>  * <p>
>  * The application creates as many UIMA AS clients and threads as specified
> in the "scaleup"
>  * argument. Each instance runs in its own thread and has is its own temp
> reply queue. It
>  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> service. For this
>  * a CAS Pool containing a single CAS is sufficient.
>  * <p>
>  * Each client sends as many CASes to a remote service as specified in the
> "howManyCASesToSend"
>  * argument.
>  * <p>
>  * The application initializes a CountDownLatch to the number of
> clients/threads which is than
>  * used to await completion. When a worker thread completes its run, it
> sends a CPC and counts down the
>  * latch reducing the number of busy threads. When all threads finish, the
> application is notified
>  * and can proceed to cleanup and shutdown.
>  *
>  */
> public class MultithreadedClientApp {
> public CountDownLatch latch = null;
> public void initializeAndRun(String[] args) throws Exception {
> try {
> int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
> latch = new CountDownLatch(howManyWorkers);  // each worker counts down when
> done
> // Create Worker threads
> ClientWorker[] workers = new ClientWorker[howManyWorkers];
> final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
> howManyWorkers);
> // Thread Pool Executor to manages threads
> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> howManyWorkers,
>                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> // Start all threads
> threadPool.prestartAllCoreThreads();
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i] = new ClientWorker();
> // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> }
>
> for( int i=0; i < howManyWorkers; i++ ) {
> threadPool.submit(workers[i]);  // start the workers
> }
> // Each worker counts down the latch after it is done sending CASes
> latch.await();
>  // All worker threads completed, now stop the clients
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i].stop();  // stop UIMA AS clients
> }
>
> threadPool.shutdown();  // cleanup thread pool
>  System.out.println("All UIMA AS Clients Finished Processing");
> } catch( Exception e ) {
> e.printStackTrace();
> }
>  }
> public static void main(String[] args) {
> MultithreadedClientApp client = new MultithreadedClientApp();
> try {
> if ( args.length != 4 ) {
> System.out.println("Usage: ");
> }
> client.initializeAndRun(args);
> } catch( Exception e ) {
> e.printStackTrace();
> }
> }
> public class ClientWorker implements Runnable {
> private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> private int howManyCASes = 0;
> public void initialize(String brokerUrl, String endpoint, int howManyCASes )
> throws Exception {
>  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> Map<String, Object> appCtx = new HashMap<String, Object>();
>    // set server URI and Endpoint
>    // Add Broker URI
>    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
>    // Add Queue Name
>    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
>    // Add the Cas Pool Size and initial FS heap size
>    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>
>    // initialize
>    uimaASClient.initialize(appCtx);
>    this.howManyCASes = howManyCASes;
> }
> public void stop() {
> uimaASClient.stop();
> }
> public void run() {
> try {
>    int sentSoFar = 0;
>             CAS cas = uimaASClient.getCAS();
>             int count=1;
>     while( sentSoFar < howManyCASes ) {
>
>          cas.setDocumentText("Some Text");
>
>          uimaASClient.sendAndReceiveCAS(cas);
>          System.out.println("Thread:"+Thread.currentThread().getId()+":::
> Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
>          cas.reset();
>          sentSoFar++;
>     }
>     uimaASClient.collectionProcessingComplete();
>     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> CPC. Thread Done");
>     latch.countDown();
> } catch( Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
>
>
>
>
> On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
>
>> Dietmar, I tried my example application with an Aggregate Service and see
>> no problem. Your previous email had no source attached.
>> Attached please find an example application code I use in my testing. To
>> run it"
>>
>> java -cp <classpath> MultithreadedClientApp
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>>
>> The code adds a short text to each CAS before each call to
>> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>>
>> Jerry
>>
>> 2010/12/9 Dietmar Gräbner <d....@gmail.com>
>>
>> Hi Eddie,
>>>
>>> wouldn't the client requests be serialized in the szenario you propose?
>>>
>>> Dietmar
>>>
>>>
>>>
>>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com>
>>> wrote:
>>> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>>> >> I wrote a test client creating multiple threads. Each thread
>>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>>> >
>>> > FWIW, a similar multithreaded client scenario that has been used with
>>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>>> > using the common API object.
>>> >
>>> > Eddie
>>> >
>>>
>>
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Jaroslaw Cwiklik <ui...@gmail.com>.
For some reason attachment dont seem to work. Here is my code:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */


import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;

/**
 * Example client application that can instantiate multiple UIMA AS clients
each running in
 * a separate thread.
 * <p>
 * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
 * <p>
 * The application creates as many UIMA AS clients and threads as specified
in the "scaleup"
 * argument. Each instance runs in its own thread and has is its own temp
reply queue. It
 * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
service. For this
 * a CAS Pool containing a single CAS is sufficient.
 * <p>
 * Each client sends as many CASes to a remote service as specified in the
"howManyCASesToSend"
 * argument.
 * <p>
 * The application initializes a CountDownLatch to the number of
clients/threads which is than
 * used to await completion. When a worker thread completes its run, it
sends a CPC and counts down the
 * latch reducing the number of busy threads. When all threads finish, the
application is notified
 * and can proceed to cleanup and shutdown.
 *
 */
public class MultithreadedClientApp {
public CountDownLatch latch = null;
public void initializeAndRun(String[] args) throws Exception {
try {
int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
latch = new CountDownLatch(howManyWorkers);  // each worker counts down when
done
// Create Worker threads
ClientWorker[] workers = new ClientWorker[howManyWorkers];
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
howManyWorkers);
// Thread Pool Executor to manages threads
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
howManyWorkers,
                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
// Start all threads
threadPool.prestartAllCoreThreads();
for( int i=0; i < howManyWorkers; i++ ) {
workers[i] = new ClientWorker();
// 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
}

for( int i=0; i < howManyWorkers; i++ ) {
threadPool.submit(workers[i]);  // start the workers
}
// Each worker counts down the latch after it is done sending CASes
latch.await();
 // All worker threads completed, now stop the clients
for( int i=0; i < howManyWorkers; i++ ) {
workers[i].stop();  // stop UIMA AS clients
}

threadPool.shutdown();  // cleanup thread pool
 System.out.println("All UIMA AS Clients Finished Processing");
} catch( Exception e ) {
e.printStackTrace();
}
 }
public static void main(String[] args) {
MultithreadedClientApp client = new MultithreadedClientApp();
try {
if ( args.length != 4 ) {
System.out.println("Usage: ");
}
client.initializeAndRun(args);
} catch( Exception e ) {
e.printStackTrace();
}
}
public class ClientWorker implements Runnable {
private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
private int howManyCASes = 0;
public void initialize(String brokerUrl, String endpoint, int howManyCASes )
throws Exception {
 uimaASClient = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx = new HashMap<String, Object>();
    // set server URI and Endpoint
    // Add Broker URI
    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
    // Add Queue Name
    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
    // Add the Cas Pool Size and initial FS heap size
    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);

    // initialize
    uimaASClient.initialize(appCtx);
    this.howManyCASes = howManyCASes;
}
public void stop() {
uimaASClient.stop();
}
public void run() {
try {
    int sentSoFar = 0;
             CAS cas = uimaASClient.getCAS();
             int count=1;
     while( sentSoFar < howManyCASes ) {

          cas.setDocumentText("Some Text");

          uimaASClient.sendAndReceiveCAS(cas);
          System.out.println("Thread:"+Thread.currentThread().getId()+":::
Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
          cas.reset();
          sentSoFar++;
     }
     uimaASClient.collectionProcessingComplete();
     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
CPC. Thread Done");
     latch.countDown();
} catch( Exception e) {
e.printStackTrace();
}
}
}
}




On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:

> Dietmar, I tried my example application with an Aggregate Service and see
> no problem. Your previous email had no source attached.
> Attached please find an example application code I use in my testing. To
> run it"
>
> java -cp <classpath> MultithreadedClientApp
> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>
> The code adds a short text to each CAS before each call to
> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>
> Jerry
>
> 2010/12/9 Dietmar Gräbner <d....@gmail.com>
>
> Hi Eddie,
>>
>> wouldn't the client requests be serialized in the szenario you propose?
>>
>> Dietmar
>>
>>
>>
>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com>
>> wrote:
>> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>> >> I wrote a test client creating multiple threads. Each thread
>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>> >
>> > FWIW, a similar multithreaded client scenario that has been used with
>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>> > using the common API object.
>> >
>> > Eddie
>> >
>>
>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Jaroslaw Cwiklik <ui...@gmail.com>.
Dietmar, I tried my example application with an Aggregate Service and see no
problem. Your previous email had no source attached.
Attached please find an example application code I use in my testing. To run
it"

java -cp <classpath> MultithreadedClientApp
<brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>

The code adds a short text to each CAS before each call to sendAndReceive().
There are no app listeners attached to UIMA AS client.

Jerry

2010/12/9 Dietmar Gräbner <d....@gmail.com>

> Hi Eddie,
>
> wouldn't the client requests be serialized in the szenario you propose?
>
> Dietmar
>
>
>
> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com>
> wrote:
> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
> >> I wrote a test client creating multiple threads. Each thread
> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> >> aggregate with the sendAndReceiveCAS() call. When running the program
> >> with e.g. 100 Threads the client gets stuck after processing X calls.
> >
> > FWIW, a similar multithreaded client scenario that has been used with
> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
> > with big enough casPool and have each thread call sendAndReceiveCAS()
> > using the common API object.
> >
> > Eddie
> >
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
Hi Eddie,

wouldn't the client requests be serialized in the szenario you propose?

Dietmar



On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <ea...@gmail.com> wrote:
> 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>> I wrote a test client creating multiple threads. Each thread
>> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> aggregate with the sendAndReceiveCAS() call. When running the program
>> with e.g. 100 Threads the client gets stuck after processing X calls.
>
> FWIW, a similar multithreaded client scenario that has been used with
> no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
> with big enough casPool and have each thread call sendAndReceiveCAS()
> using the common API object.
>
> Eddie
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
I deployed the ./examples/deploy/as/Deploy_MeetingDetectorTAE.xml and
started 200 Threads. The same problem occurs.

2010/12/9 Dietmar Gräbner <d....@gmail.com>:
> Hi Jaroslaw,
>
> I tried my Example with the RoomNumberAnnotator service - it worked
> because it is a primitive service. I only have problems with
> aggregates defining delegates.
> I tried the Deploy_MeetingFinder.xml in the example folder, but the
> service doesn't work. I have to check the Exceptions.
>
> In the attachment you can find the test client. The WorkerThread is a
> copy of the RunRemoteAE example shipped with the uima relase. The
> MultiThreadedTestClient creates the threads.
>
> thx
>
> Dietmar
>
>
> On Wed, Dec 8, 2010 at 2:52 AM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
>> I've created a multi-threaded application that uses a
>>  BaseUIMAAsynchronousEngine_impl  instance per thread. Scaled this up to 100
>> threads (100 clients). Ran this many times with a single instance of
>> RoomNumber Annotator service (from uima as examples). No hangs. Can you try
>> to run your application with a simple UIMA AS service like RoomNumber
>> Annotator. You can find the deployment descriptor for it in the
>> UIMA_HOME/examples/deploy/as.Its called Deploy_RoomNumberAnnotator.xml.
>>
>>
>>
>> On Tue, Dec 7, 2010 at 4:54 PM, Eddie Epstein <ea...@gmail.com> wrote:
>>
>>> 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>>> > I wrote a test client creating multiple threads. Each thread
>>> > instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>>> > aggregate with the sendAndReceiveCAS() call. When running the program
>>> > with e.g. 100 Threads the client gets stuck after processing X calls.
>>>
>>> FWIW, a similar multithreaded client scenario that has been used with
>>> no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>>> with big enough casPool and have each thread call sendAndReceiveCAS()
>>> using the common API object.
>>>
>>> Eddie
>>>
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
Hi Jaroslaw,

I tried my Example with the RoomNumberAnnotator service - it worked
because it is a primitive service. I only have problems with
aggregates defining delegates.
I tried the Deploy_MeetingFinder.xml in the example folder, but the
service doesn't work. I have to check the Exceptions.

In the attachment you can find the test client. The WorkerThread is a
copy of the RunRemoteAE example shipped with the uima relase. The
MultiThreadedTestClient creates the threads.

thx

Dietmar


On Wed, Dec 8, 2010 at 2:52 AM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> I've created a multi-threaded application that uses a
>  BaseUIMAAsynchronousEngine_impl  instance per thread. Scaled this up to 100
> threads (100 clients). Ran this many times with a single instance of
> RoomNumber Annotator service (from uima as examples). No hangs. Can you try
> to run your application with a simple UIMA AS service like RoomNumber
> Annotator. You can find the deployment descriptor for it in the
> UIMA_HOME/examples/deploy/as.Its called Deploy_RoomNumberAnnotator.xml.
>
>
>
> On Tue, Dec 7, 2010 at 4:54 PM, Eddie Epstein <ea...@gmail.com> wrote:
>
>> 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>> > I wrote a test client creating multiple threads. Each thread
>> > instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> > aggregate with the sendAndReceiveCAS() call. When running the program
>> > with e.g. 100 Threads the client gets stuck after processing X calls.
>>
>> FWIW, a similar multithreaded client scenario that has been used with
>> no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>> with big enough casPool and have each thread call sendAndReceiveCAS()
>> using the common API object.
>>
>> Eddie
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
Hi,

Finally I found the root of the problem: the analysisenging/delegate
section in the deployment descriptor.

I modified the meeting detector TAE "Deploy_MeetingDetectorTAE.xml"
file: The <analysis engine> section was added without any scaleout
information. Using this deployment descriptor with 100 Threads
(Clients) calling the the service asynchronously leads to the
blocking.

I tried it with both AMQ 4.1.1 and the latest version 5.X.

Here is the descriptor:
--------------
<analysisEngineDeploymentDescription
  xmlns="http://uima.apache.org/resourceSpecifier">

  <name>Meeting Detector TAE</name>
  <description>Deploys Meeting Detector Aggregate AE with all its
delegates in the same JVM.</description>

  <deployment protocol="jms" provider="activemq">
    <service>
      <inputQueue endpoint="MeetingDetectorTaeQueue"
brokerURL="${defaultBrokerURL}"/>
      <topDescriptor>
       <import location="../../descriptors/tutorial/ex4/MeetingDetectorTAE.xml"/>
      </topDescriptor>

       <analysisEngine key="MeetingDetectorTae">
              <delegates>
                <analysisEngine key="RoomNumber">
                </analysisEngine>
                <analysisEngine key ="DateTime">
                </analysisEngine>
                <analysisEngine key ="Meeting">
                </analysisEngine>
              </delegates>
      </analysisEngine>

    </service>
  </deployment>

</analysisEngineDeploymentDescription>
----------------

thx

Dietmar






On Wed, Dec 8, 2010 at 2:52 AM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> I've created a multi-threaded application that uses a
>  BaseUIMAAsynchronousEngine_impl  instance per thread. Scaled this up to 100
> threads (100 clients). Ran this many times with a single instance of
> RoomNumber Annotator service (from uima as examples). No hangs. Can you try
> to run your application with a simple UIMA AS service like RoomNumber
> Annotator. You can find the deployment descriptor for it in the
> UIMA_HOME/examples/deploy/as.Its called Deploy_RoomNumberAnnotator.xml.
>
>
>
> On Tue, Dec 7, 2010 at 4:54 PM, Eddie Epstein <ea...@gmail.com> wrote:
>
>> 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
>> > I wrote a test client creating multiple threads. Each thread
>> > instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> > aggregate with the sendAndReceiveCAS() call. When running the program
>> > with e.g. 100 Threads the client gets stuck after processing X calls.
>>
>> FWIW, a similar multithreaded client scenario that has been used with
>> no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>> with big enough casPool and have each thread call sendAndReceiveCAS()
>> using the common API object.
>>
>> Eddie
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Jaroslaw Cwiklik <ui...@gmail.com>.
I've created a multi-threaded application that uses a
 BaseUIMAAsynchronousEngine_impl  instance per thread. Scaled this up to 100
threads (100 clients). Ran this many times with a single instance of
RoomNumber Annotator service (from uima as examples). No hangs. Can you try
to run your application with a simple UIMA AS service like RoomNumber
Annotator. You can find the deployment descriptor for it in the
UIMA_HOME/examples/deploy/as.Its called Deploy_RoomNumberAnnotator.xml.



On Tue, Dec 7, 2010 at 4:54 PM, Eddie Epstein <ea...@gmail.com> wrote:

> 2010/12/7 Dietmar Gräbner <d....@gmail.com>:
> > I wrote a test client creating multiple threads. Each thread
> > instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> > aggregate with the sendAndReceiveCAS() call. When running the program
> > with e.g. 100 Threads the client gets stuck after processing X calls.
>
> FWIW, a similar multithreaded client scenario that has been used with
> no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
> with big enough casPool and have each thread call sendAndReceiveCAS()
> using the common API object.
>
> Eddie
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Eddie Epstein <ea...@gmail.com>.
2010/12/7 Dietmar Gräbner <d....@gmail.com>:
> I wrote a test client creating multiple threads. Each thread
> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> aggregate with the sendAndReceiveCAS() call. When running the program
> with e.g. 100 Threads the client gets stuck after processing X calls.

FWIW, a similar multithreaded client scenario that has been used with
no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
with big enough casPool and have each thread call sendAndReceiveCAS()
using the common API object.

Eddie

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Jaroslaw Cwiklik <ui...@gmail.com>.
I see that there are a few UIMA AS threads stuck waiting for Collection
Process Complete reply message.
Can you share the test program? I could deploy it against a simple service
and see if I can replicate the
hang.

Regards, Jerry C



2010/12/7 Dietmar Gräbner <d....@gmail.com>

> Hi Jaroslaw,
>
> I use 4.1.1 - but I'll try the newer version.
> OutOfMemory didn't occur - actually the cases are pretty small.
>
> the attachment contains the Thread dump. I hope that works for user list!
>
>
> Thx for the fast reply
>
> Best Regards
>
> Dietmar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Dec 7, 2010 at 5:51 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> > Hi, can you provide a full trace from a thread that is stuck on
> > AbstractQueuedSynchronizer.
> > Which version of ActiveMQ are you using? If 4.1.1, perhaps you can try a
> > newer version like 5.3.2. Once you download new AMQ, just set
> ACTIVEMQ_HOME
> > to point to the AMQ install dir.
> >
> > The fact that there are msgs in temp reply queues suggests that AMQ
> > MessageListener threads are stuck somewhere and not processing incoming
> > reply msgs.Perhaps there is synchronization problem either in UIMA AS or
> > ActiveMQ.
> >
> > Have you checked the client log and/or stdout for OutOfMemory Error? You
> > seem to be using quite a few CASes. Check JVM memory setting just in
> case.
> >
> > Regards, Jerry C
> >
> > 2010/12/7 Dietmar Gräbner <d....@gmail.com>
> >
> >> Hi,
> >>
> >> I hope you can help me with a problem I am struggling with for quite a
> >> while now.
> >>
> >> I wrote a test client creating multiple threads. Each thread
> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> >> aggregate with the sendAndReceiveCAS() call. When running the program
> >> with e.g. 100 Threads the client gets stuck after processing X calls.
> >>
> >> Environment:
> >> -client:
> >> - - uses the uima  2.3.0-incubating
> >> - - the client is a modified copy of the RunRemoteAE.java uima
> >> provides as an example
> >> - - no timeouts configured except MetaTimeout
> >>
> >> -server:
> >> - - server runs on a different machine
> >> - - 2.3.0 with JMX configured to monitor activeMq and the Uima Services
> >> - - Deployment Descriptor
> >> - - - the uima service has two delegates: WhitespaceTokenizer and a
> >> SentenceAnnotator
> >> - - - no extra error handling configured (see at the end of the email)
> >>
> >>
> >>
> >> Detailed problem description:
> >>
> >> - The UIMA service reports in the JMX stats that all documents have
> >> been processed. The individual delegate logs report that the process
> >> method has been finished.
> >> - Some of the client threads are blocked at the
> >> AbstractQueuedSynchronizer. The others finished sucessfully.
> >> - The Temp Queues in Active MQ still exist. Some are containing
> >> messages and there are differences in the enqueue and dequeue counts.
> >> - I experimented with different configuration parameters in the AS
> >> deploment descriptor, though I don't think that this is the problem.
> >> - Everything works fine with a primitve UIMA Service.
> >> - I set the log settings to ALL, but couldn't find any Exceptions.
> >>
> >>
> >> Thank you in advance.
> >>
> >> Best regards,
> >>
> >> Dietmar
> >>
> >>
> >> -----
> >> the most simple descriptor I used (I also tried 1 instance per used
> >> thread):
> >>
> >> <analysisEngineDeploymentDescription
> >>  xmlns="http://uima.apache.org/resourceSpecifier">
> >>  <name>SentenceAnnotator</name>
> >>  <description>Deploys SentenceAnnotator AE</description>
> >>  <deployment protocol="jms" provider="activemq" >
> >>    <service>
> >>      <inputQueue endpoint="SentenceAnnotatorQueue"
> >> brokerURL="${defaultBrokerURL}"/>
> >>      <topDescriptor>
> >>         <import location="AAE_WSTokenizerSentenceAnnotator.xml"/>
> >>      </topDescriptor>
> >>      <analysisEngine async="true" key="SentenceAnnotator"
> >> internalReplyQueueScaleout="1" inputQueueScaleout="1">
> >>              <delegates>
> >>                <analysisEngine key="SentenceAnnotator">
> >>                  <scaleout numberOfInstances="1"/>
> >>                </analysisEngine>
> >>                <analysisEngine key ="WhitespaceTokenizer">
> >>                  <scaleout numberOfInstances="1"/>
> >>                </analysisEngine>
> >>              </delegates>
> >>      </analysisEngine>
> >>    </service>
> >>  </deployment>
> >>
> >
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Dietmar Gräbner <d....@gmail.com>.
Hi Jaroslaw,

I use 4.1.1 - but I'll try the newer version.
OutOfMemory didn't occur - actually the cases are pretty small.

the attachment contains the Thread dump. I hope that works for user list!


Thx for the fast reply

Best Regards

Dietmar

















On Tue, Dec 7, 2010 at 5:51 PM, Jaroslaw Cwiklik <ui...@gmail.com> wrote:
> Hi, can you provide a full trace from a thread that is stuck on
> AbstractQueuedSynchronizer.
> Which version of ActiveMQ are you using? If 4.1.1, perhaps you can try a
> newer version like 5.3.2. Once you download new AMQ, just set ACTIVEMQ_HOME
> to point to the AMQ install dir.
>
> The fact that there are msgs in temp reply queues suggests that AMQ
> MessageListener threads are stuck somewhere and not processing incoming
> reply msgs.Perhaps there is synchronization problem either in UIMA AS or
> ActiveMQ.
>
> Have you checked the client log and/or stdout for OutOfMemory Error? You
> seem to be using quite a few CASes. Check JVM memory setting just in case.
>
> Regards, Jerry C
>
> 2010/12/7 Dietmar Gräbner <d....@gmail.com>
>
>> Hi,
>>
>> I hope you can help me with a problem I am struggling with for quite a
>> while now.
>>
>> I wrote a test client creating multiple threads. Each thread
>> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> aggregate with the sendAndReceiveCAS() call. When running the program
>> with e.g. 100 Threads the client gets stuck after processing X calls.
>>
>> Environment:
>> -client:
>> - - uses the uima  2.3.0-incubating
>> - - the client is a modified copy of the RunRemoteAE.java uima
>> provides as an example
>> - - no timeouts configured except MetaTimeout
>>
>> -server:
>> - - server runs on a different machine
>> - - 2.3.0 with JMX configured to monitor activeMq and the Uima Services
>> - - Deployment Descriptor
>> - - - the uima service has two delegates: WhitespaceTokenizer and a
>> SentenceAnnotator
>> - - - no extra error handling configured (see at the end of the email)
>>
>>
>>
>> Detailed problem description:
>>
>> - The UIMA service reports in the JMX stats that all documents have
>> been processed. The individual delegate logs report that the process
>> method has been finished.
>> - Some of the client threads are blocked at the
>> AbstractQueuedSynchronizer. The others finished sucessfully.
>> - The Temp Queues in Active MQ still exist. Some are containing
>> messages and there are differences in the enqueue and dequeue counts.
>> - I experimented with different configuration parameters in the AS
>> deploment descriptor, though I don't think that this is the problem.
>> - Everything works fine with a primitve UIMA Service.
>> - I set the log settings to ALL, but couldn't find any Exceptions.
>>
>>
>> Thank you in advance.
>>
>> Best regards,
>>
>> Dietmar
>>
>>
>> -----
>> the most simple descriptor I used (I also tried 1 instance per used
>> thread):
>>
>> <analysisEngineDeploymentDescription
>>  xmlns="http://uima.apache.org/resourceSpecifier">
>>  <name>SentenceAnnotator</name>
>>  <description>Deploys SentenceAnnotator AE</description>
>>  <deployment protocol="jms" provider="activemq" >
>>    <service>
>>      <inputQueue endpoint="SentenceAnnotatorQueue"
>> brokerURL="${defaultBrokerURL}"/>
>>      <topDescriptor>
>>         <import location="AAE_WSTokenizerSentenceAnnotator.xml"/>
>>      </topDescriptor>
>>      <analysisEngine async="true" key="SentenceAnnotator"
>> internalReplyQueueScaleout="1" inputQueueScaleout="1">
>>              <delegates>
>>                <analysisEngine key="SentenceAnnotator">
>>                  <scaleout numberOfInstances="1"/>
>>                </analysisEngine>
>>                <analysisEngine key ="WhitespaceTokenizer">
>>                  <scaleout numberOfInstances="1"/>
>>                </analysisEngine>
>>              </delegates>
>>      </analysisEngine>
>>    </service>
>>  </deployment>
>>
>

Re: UimaAS blocks when accessing a queue with multiple clients concurrently

Posted by Jaroslaw Cwiklik <ui...@gmail.com>.
Hi, can you provide a full trace from a thread that is stuck on
AbstractQueuedSynchronizer.
Which version of ActiveMQ are you using? If 4.1.1, perhaps you can try a
newer version like 5.3.2. Once you download new AMQ, just set ACTIVEMQ_HOME
to point to the AMQ install dir.

The fact that there are msgs in temp reply queues suggests that AMQ
MessageListener threads are stuck somewhere and not processing incoming
reply msgs.Perhaps there is synchronization problem either in UIMA AS or
ActiveMQ.

Have you checked the client log and/or stdout for OutOfMemory Error? You
seem to be using quite a few CASes. Check JVM memory setting just in case.

Regards, Jerry C

2010/12/7 Dietmar Gräbner <d....@gmail.com>

> Hi,
>
> I hope you can help me with a problem I am struggling with for quite a
> while now.
>
> I wrote a test client creating multiple threads. Each thread
> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
> aggregate with the sendAndReceiveCAS() call. When running the program
> with e.g. 100 Threads the client gets stuck after processing X calls.
>
> Environment:
> -client:
> - - uses the uima  2.3.0-incubating
> - - the client is a modified copy of the RunRemoteAE.java uima
> provides as an example
> - - no timeouts configured except MetaTimeout
>
> -server:
> - - server runs on a different machine
> - - 2.3.0 with JMX configured to monitor activeMq and the Uima Services
> - - Deployment Descriptor
> - - - the uima service has two delegates: WhitespaceTokenizer and a
> SentenceAnnotator
> - - - no extra error handling configured (see at the end of the email)
>
>
>
> Detailed problem description:
>
> - The UIMA service reports in the JMX stats that all documents have
> been processed. The individual delegate logs report that the process
> method has been finished.
> - Some of the client threads are blocked at the
> AbstractQueuedSynchronizer. The others finished sucessfully.
> - The Temp Queues in Active MQ still exist. Some are containing
> messages and there are differences in the enqueue and dequeue counts.
> - I experimented with different configuration parameters in the AS
> deploment descriptor, though I don't think that this is the problem.
> - Everything works fine with a primitve UIMA Service.
> - I set the log settings to ALL, but couldn't find any Exceptions.
>
>
> Thank you in advance.
>
> Best regards,
>
> Dietmar
>
>
> -----
> the most simple descriptor I used (I also tried 1 instance per used
> thread):
>
> <analysisEngineDeploymentDescription
>  xmlns="http://uima.apache.org/resourceSpecifier">
>  <name>SentenceAnnotator</name>
>  <description>Deploys SentenceAnnotator AE</description>
>  <deployment protocol="jms" provider="activemq" >
>    <service>
>      <inputQueue endpoint="SentenceAnnotatorQueue"
> brokerURL="${defaultBrokerURL}"/>
>      <topDescriptor>
>         <import location="AAE_WSTokenizerSentenceAnnotator.xml"/>
>      </topDescriptor>
>      <analysisEngine async="true" key="SentenceAnnotator"
> internalReplyQueueScaleout="1" inputQueueScaleout="1">
>              <delegates>
>                <analysisEngine key="SentenceAnnotator">
>                  <scaleout numberOfInstances="1"/>
>                </analysisEngine>
>                <analysisEngine key ="WhitespaceTokenizer">
>                  <scaleout numberOfInstances="1"/>
>                </analysisEngine>
>              </delegates>
>      </analysisEngine>
>    </service>
>  </deployment>
>