You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@curator.apache.org by Sznajder ForMailingList <bs...@gmail.com> on 2013/11/14 22:18:58 UTC

Multiple consumers on a single server - strange behavior.

Hi

I made a short test as following:

- I have a chorum of 3 nodes for Zookeeper.
- I wrote a class using Curator QueueProducer who produces all the time
(when the queue is 10% full, it creates new items) , items (random integer)
- I wrote a simple class using Curator Queue Consumer which simply prints
to Log "consumed item i".

I tested some different combinations :
- running the consumers on one, two or three nodes.
- running one or more consumers in parallel on a given node.


But, and here is my question: I see some very strange behavior when I have
several consummers in parallel on a node. For example, running 5 consumers
per node on 3 nodes, I see a throughput **very** slow. When looking at my
Log, I see that most of the consumers are most of the time on an idle
state....

Do I mistake somewhere?
 I was expecting to enhance the throughput by augmenting the number of
consumers, I am surprised to see the opposite....

Thanks a lot

Benjamin

Re: Multiple consumers on a single server - strange behavior.

Posted by John Vines <vi...@apache.org>.
I don't know the finer details, but I'm not surprised by your results.
Having 15 consumers of a single, highly synchronous queue is hard to
maintain, so it makes sense that each consumer would spend more time
idling. I think ultimately you want to scale the number of consumers to the
complexity of the processing that gets done after an item is removed from
the queue, not arbitrarily scaling up.

Sent from my phone, please pardon the typos and brevity.
On Nov 17, 2013 10:54 AM, "Sznajder ForMailingList" <
bs4mailinglist@gmail.com> wrote:

> If someone has hint on this subject, it could be very useful for me...
>
> Thanks
>
> Benjamin
>
>
> On Thu, Nov 14, 2013 at 11:18 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
>> Hi
>>
>> I made a short test as following:
>>
>> - I have a chorum of 3 nodes for Zookeeper.
>> - I wrote a class using Curator QueueProducer who produces all the time
>> (when the queue is 10% full, it creates new items) , items (random integer)
>> - I wrote a simple class using Curator Queue Consumer which simply prints
>> to Log "consumed item i".
>>
>> I tested some different combinations :
>> - running the consumers on one, two or three nodes.
>> - running one or more consumers in parallel on a given node.
>>
>>
>> But, and here is my question: I see some very strange behavior when I
>> have several consummers in parallel on a node. For example, running 5
>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>> looking at my Log, I see that most of the consumers are most of the time on
>> an idle state....
>>
>> Do I mistake somewhere?
>>  I was expecting to enhance the throughput by augmenting the number of
>> consumers, I am surprised to see the opposite....
>>
>> Thanks a lot
>>
>> Benjamin
>>
>
>

Re: Multiple consumers on a single server - strange behavior.

Posted by Sznajder ForMailingList <bs...@gmail.com>.
If someone has hint on this subject, it could be very useful for me...

Thanks

Benjamin


On Thu, Nov 14, 2013 at 11:18 PM, Sznajder ForMailingList <
bs4mailinglist@gmail.com> wrote:

> Hi
>
> I made a short test as following:
>
> - I have a chorum of 3 nodes for Zookeeper.
> - I wrote a class using Curator QueueProducer who produces all the time
> (when the queue is 10% full, it creates new items) , items (random integer)
> - I wrote a simple class using Curator Queue Consumer which simply prints
> to Log "consumed item i".
>
> I tested some different combinations :
> - running the consumers on one, two or three nodes.
> - running one or more consumers in parallel on a given node.
>
>
> But, and here is my question: I see some very strange behavior when I have
> several consummers in parallel on a node. For example, running 5 consumers
> per node on 3 nodes, I see a throughput **very** slow. When looking at my
> Log, I see that most of the consumers are most of the time on an idle
> state....
>
> Do I mistake somewhere?
>  I was expecting to enhance the throughput by augmenting the number of
> consumers, I am surprised to see the opposite....
>
> Thanks a lot
>
> Benjamin
>

Re: Multiple consumers on a single server - strange behavior.

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
I was looking at QueueProducer:

 // We register to the listener for monitoring the number of elements
        // in the queue
        framework.getCuratorListenable().addListener(new CuratorListener() {
            @Override
            public void eventReceived(final CuratorFramework framework_,
                    CuratorEvent event) throws Exception {
                if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
                    // this also restores the notification
                    List<String> children = framework_.getChildren()
                            .watched().forPath(Utils.CRAWL_QUEUE_PATH);
                    if (children.size() <= QUEUE_SIZE/2) {
                        addQueueContent(QUEUE_SIZE - children.size());
                    }
                }
            }
        });

On Nov 17, 2013, at 1:53 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:

> Hi Jordan,
> 
> Thanks again for your so quick answer.
> 
> I would like to be sure about your hint.
> 
> Do you mean that I should change the following method
> 
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
>                 } 
>             }
> 
> 
> 
> or the code in the QueueProducer?
> 
> In this last case (code in the QueueProducer), I think it will not solve the problem: I can see in my logs that the consumers are idle even when the Producer does not do anything...
> 
> Best regards
> 
> Benjamin
> 
> 
> 
> 
> On Sun, Nov 17, 2013 at 11:33 PM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
> I’ll look further at this, but the first thing that I notice is that you are doing “work” in your Curator Listener. Please read Curator Tech Note 1:
> 
> 	https://cwiki.apache.org/confluence/display/CURATOR/TN1
> 
> The quickest fix would be to do the getChildren() as a background operation. Alternatively, you can pass in a thread pool when registering the listener.
> 
> -Jordan
> 
> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
> 
>> First at all , thank you for your answer.
>> 
>> Here is the simple code, I used:
>> 
>> The producer and queueconsummer are given in the class 
>> 
>> Every 5 minutes, I am printing the the number of processed items, and I see some drastic differences between the different consumers:
>> 
>> 
>> 
>> Producer:
>> =-=-=-=-=
>> 
>> package com.zk;
>> 
>> import java.io.Closeable;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>> 
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.api.CuratorEvent;
>> import org.apache.curator.framework.api.CuratorListener;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> 
>> public class QueueProducer implements Closeable {
>>     
>>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>>     
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>     
>>     protected static final String PATH = "/test_queue";
>> 
>>     protected static final String LOCK_PATH = "/test_lock_queue";
>>     
>>     private DistributedQueue<CrawlUrl> queue;
>>     
>>     private static final int QUEUE_SIZE = 100000;
>>     
>>     private int items;
>> 
>>     public QueueProducer(CuratorFramework framework) throws Exception {
>>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>>         this.queue = Utils.newDistributedQueue(framework,
>>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>>         this.queue.start();
>>         addQueueContent(QUEUE_SIZE);
>>         System.out.println("Done with the initial init");
>> 
>> 
>>         // We register to the listener for monitoring the number of elements
>>         // in the queue
>>         framework.getCuratorListenable().addListener(new CuratorListener() {
>>             @Override
>>             public void eventReceived(final CuratorFramework framework_,
>>                     CuratorEvent event) throws Exception {
>>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>>                     // this also restores the notification
>>                     List<String> children = framework_.getChildren()
>>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>                     if (children.size() <= QUEUE_SIZE/2) {
>>                         addQueueContent(QUEUE_SIZE - children.size());
>>                     }
>>                 }
>>             }
>>         });
>> 
>> 
>>         while (true) {
>>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>             if (children.size() <= QUEUE_SIZE/2) {
>>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We call for size " + children.size());
>>                 addQueueContent(QUEUE_SIZE - children.size());
>>             }            
>>                 
>>             Thread.sleep(5000);
>> 
>>         }
>>     }
>> 
>>     void addQueueContent(int numberOfItems) {
>>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>>         for (int i = 0; i < numberOfItems; i++) {
>>             try {
>>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>>                 this.queue.put(url);
>>             } catch (Exception e) {
>>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>>             }
>>         }
>>     }
>>     
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>> 
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>> 
>>             final CuratorFramework framework = Utils.newFramework(connectString);
>>             framework.start();
>> 
>>             @SuppressWarnings("unused")
>>             QueueProducer producer = new QueueProducer(framework);
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>> 
>>     }
>> 
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>>     
>>     
>> 
>> }
>> 
>> 
>> 
>> 
>> Consumer 
>> =-=-=-=-=-
>> 
>> package com.zk;
>> 
>> import java.io.Closeable;
>> import java.io.File;
>> import java.io.FileWriter;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> 
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.framework.recipes.queue.QueueConsumer;
>> import org.apache.curator.framework.state.ConnectionState;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> 
>> 
>> public class MyQueueConsumer implements Closeable{
>>     
>>     private DistributedQueue<CrawlUrl> queue;
>> 
>>     String name;
>> 
>>     String id;
>> 
>>     FileWriter timeCounter;
>>     
>>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>>     
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>     
>>     int numberOfProcessedURL;
>> 
>>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>>     
>>     private class FileWriterThread extends Thread {
>> 
>>         public FileWriterThread() {
>>             // empty ctor
>>         }
>> 
>>         @Override
>>         public void run() {
>>             // We write the stats:
>> 
>>             try {
>>                 while (true) {
>>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>>                     MyQueueConsumer.this.timeCounter.flush();
>>                 
>>                     // Sleeps 5 minutes
>>                     Thread.sleep(300000);
>>                 }
>>             } catch (Exception e) {
>>                 // TODO Auto-generated catch block
>>                 e.printStackTrace();
>>             }
>>         }
>>     }
>>     
>>     
>>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception {
>>         this.id = id;
>>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_" +id + "_timeCounter.txt"));
>>         
>> //        this.timeCounterThread = new FileWriterThread();
>> //        this.timeCounterThread.start();
>>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new QueueConsumer<CrawlUrl>() {
>> 
>>             @Override
>>             public void stateChanged(CuratorFramework client, ConnectionState newState) {
>>                 System.out.println(String.format("[%s] connection state changed to %s", id, newState));
>>             }
>> 
>>             @Override
>>             public void consumeMessage(CrawlUrl url) throws Exception {
>>                 try {
>>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>>                     MyQueueConsumer.this.numberOfProcessedURL++;
>>                 } catch (Exception e) {
>>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
>>                 } 
>>             }
>> 
>>         });
>>         try {
>>             this.queue.start();
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>> 
>>     }
>>     
>>     public static void main(String[] args) {
>>         try {
>>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>>     
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>     
>>             final CuratorFramework framework = Utils.newFramework(connectString);
>>             framework.start();
>>     
>>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>>     
>>             for (int i = 0; i < queueConsumers.length; i++) {
>>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>>             }
>>     
>>             Runtime.getRuntime().addShutdownHook(new Thread() {
>>                 @Override
>>                 public void run() {
>>                     // close workers
>>                     Throwable t = null;
>>                     LOG.info("We close the workers");
>>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>>                         try {
>>                             queueConsumer.close();
>>                         } catch (Throwable th) {
>>                             if (t == null) {
>>                                 t = th;
>>                             }
>>                         }
>>                     }
>>                     // throw first exception that we encountered
>>                     if (t != null) {
>>                         throw new RuntimeException("some workers failed to close", t);
>>                     }
>>                 }
>>             });
>>             
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>> }
>> 
>> 
>> 
>> 
>> Main
>> -=-=-
>> 
>> package com.zk;
>> 
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.test.TestingServer;
>> 
>> 
>> 
>> public class QueueTestMain {
>> 
>>     /**
>>      * @param args
>>      */
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>> 
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>> 
>>             final CuratorFramework framework = Utils.newFramework(connectString);
>>             framework.start();
>>             
>> 
>>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>>                 @SuppressWarnings("unused")
>>                 QueueProducer producer = new QueueProducer(framework);
>>             } else {
>>             
>>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>>                 
>>                 for (int i = 0; i < queueConsumers.length; i++) {
>>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>>                 }
>>         
>>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>>                     @Override
>>                     public void run() {
>>                         // close workers
>>                         Throwable t = null;
>>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>>                             try {
>>                                 queueConsumer.close();
>>                             } catch (Throwable th) {
>>                                 if (t == null) {
>>                                     t = th;
>>                                 }
>>                             }
>>                         }
>>                         // throw first exception that we encountered
>>                         if (t != null) {
>>                             throw new RuntimeException("some workers failed to close", t);
>>                         }
>>                     }
>>                 });
>>     
>>                 
>>             }
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>> 
>>     }
>> 
>> }
>> 
>> 
>> 
>> 
>> Example of output:
>> 
>> 
>> 
>> 
>> 
>> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
>> Can you produce a test that shows this? Anything else interesting in the log? Of course, there could be a bug.
>> 
>> -Jordan
>> 
>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
>> 
>> > Hi
>> >
>> > I made a short test as following:
>> >
>> > - I have a chorum of 3 nodes for Zookeeper.
>> > - I wrote a class using Curator QueueProducer who produces all the time (when the queue is 10% full, it creates new items) , items (random integer)
>> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log "consumed item i".
>> >
>> > I tested some different combinations :
>> > - running the consumers on one, two or three nodes.
>> > - running one or more consumers in parallel on a given node.
>> >
>> >
>> > But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When looking at my Log, I see that most of the consumers are most of the time on an idle state....
>> >
>> > Do I mistake somewhere?
>> >  I was expecting to enhance the throughput by augmenting the number of consumers, I am surprised to see the opposite....
>> >
>> > Thanks a lot
>> >
>> > Benjamin
>> 
>> 
> 
> 


Re: Multiple consumers on a single server - strange behavior.

Posted by Sznajder ForMailingList <bs...@gmail.com>.
Hi Jordan,

Thanks again for your so quick answer.

I would like to be sure about your hint.

Do you mean that I should change the following method

            @Override
            public void consumeMessage(CrawlUrl url) throws Exception {
                try {
                    LOG.info(dateFormat.format(new
Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
"] processed " + url.url);
                    MyQueueConsumer.this.numberOfProcessedURL++;
                } catch (Exception e) {
                    LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]"
+ e.getMessage() + " for url " + url.url );
                }
            }



or the code in the QueueProducer?

In this last case (code in the QueueProducer), I think it will not solve
the problem: I can see in my logs that the consumers are idle even when the
Producer does not do anything...

Best regards

Benjamin




On Sun, Nov 17, 2013 at 11:33 PM, Jordan Zimmerman <
jordan@jordanzimmerman.com> wrote:

> I’ll look further at this, but the first thing that I notice is that you
> are doing “work” in your Curator Listener. Please read Curator Tech Note 1:
>
> https://cwiki.apache.org/confluence/display/CURATOR/TN1
>
> The quickest fix would be to do the getChildren() as a background
> operation. Alternatively, you can pass in a thread pool when registering
> the listener.
>
> -Jordan
>
> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
> First at all , thank you for your answer.
>
> Here is the simple code, I used:
>
> The producer and queueconsummer are given in the class
>
> Every 5 minutes, I am printing the the number of processed items, and I
> see some drastic differences between the different consumers:
>
>
>
> Producer:
> =-=-=-=-=
>
> package com.zk;
>
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
> public class QueueProducer implements Closeable {
>
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>
>     protected static final String PATH = "/test_queue";
>
>     protected static final String LOCK_PATH = "/test_lock_queue";
>
>     private DistributedQueue<CrawlUrl> queue;
>
>     private static final int QUEUE_SIZE = 100000;
>
>     private int items;
>
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is
> a QueueProducer");
>
> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
> a QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
>
>
>         // We register to the listener for monitoring the number of
> elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener()
> {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    &&
> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
>
>
>         while (true) {
>             List<String> children =
> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the
> while(true) - We call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }
>
>             Thread.sleep(5000);
>
>         }
>     }
>
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
> numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + "
> in the initQueueContent()");
>             }
>         }
>     }
>
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
>
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>
>             final CuratorFramework framework =
> Utils.newFramework(connectString);
>             framework.start();
>
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>
>
>
> }
>
>
>
>
> Consumer
> =-=-=-=-=-
>
> package com.zk;
>
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
>
> public class MyQueueConsumer implements Closeable{
>
>     private DistributedQueue<CrawlUrl> queue;
>
>     String name;
>
>     String id;
>
>     FileWriter timeCounter;
>
>     final static Logger LOG =
> LoggerFactory.getLogger(MyQueueConsumer.class);
>
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>
>     int numberOfProcessedURL;
>
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>
>     private class FileWriterThread extends Thread {
>
>         public FileWriterThread() {
>             // empty ctor
>         }
>
>         @Override
>         public void run() {
>             // We write the stats:
>
>             try {
>                 while (true) {
>
> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>
> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>
>
>     public MyQueueConsumer(CuratorFramework framework, final String id)
> throws Exception {
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
> this.name + "_" +id + "_timeCounter.txt"));
>
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework,
> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
> QueueConsumer<CrawlUrl>() {
>
>             @Override
>             public void stateChanged(CuratorFramework client,
> ConnectionState newState) {
>                 System.out.println(String.format("[%s] connection state
> changed to %s", id, newState));
>             }
>
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new
> Date(System.currentTimeMillis())) + "["+id+ "-" +
> MyQueueConsumer.this.name <http://myqueueconsumer.this.name/>+ "]
> processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name<http://myqueueconsumer.this.name/>+
> "]" + e.getMessage() + " for url " + url.url );
>                 }
>             }
>
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>
>             final CuratorFramework framework =
> Utils.newFramework(connectString);
>             framework.start();
>
>             final MyQueueConsumer[] queueConsumers = new
> MyQueueConsumer[props.getNumberOfWorkers()];
>
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework,
> "id_"+i);
>             }
>
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to
> close", t);
>                     }
>                 }
>             });
>
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
>
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
>
>
>
>
> Main
> -=-=-
>
> package com.zk;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
>
>
>
> public class QueueTestMain {
>
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
>
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>
>             final CuratorFramework framework =
> Utils.newFramework(connectString);
>             framework.start();
>
>
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>
>                 final MyQueueConsumer[] queueConsumers = new
> MyQueueConsumer[props.getNumberOfWorkers()];
>
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework,
> "id_"+i);
>                 }
>
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer :
> queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers
> failed to close", t);
>                         }
>                     }
>                 });
>
>
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>
>     }
>
> }
>
>
>
>
> Example of output:
>
>
>
>
>
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
> jordan@jordanzimmerman.com> wrote:
>
>> Can you produce a test that shows this? Anything else interesting in the
>> log? Of course, there could be a bug.
>>
>> -Jordan
>>
>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
>> bs4mailinglist@gmail.com> wrote:
>>
>> > Hi
>> >
>> > I made a short test as following:
>> >
>> > - I have a chorum of 3 nodes for Zookeeper.
>> > - I wrote a class using Curator QueueProducer who produces all the time
>> (when the queue is 10% full, it creates new items) , items (random integer)
>> > - I wrote a simple class using Curator Queue Consumer which simply
>> prints to Log "consumed item i".
>> >
>> > I tested some different combinations :
>> > - running the consumers on one, two or three nodes.
>> > - running one or more consumers in parallel on a given node.
>> >
>> >
>> > But, and here is my question: I see some very strange behavior when I
>> have several consummers in parallel on a node. For example, running 5
>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>> looking at my Log, I see that most of the consumers are most of the time on
>> an idle state....
>> >
>> > Do I mistake somewhere?
>> >  I was expecting to enhance the throughput by augmenting the number of
>> consumers, I am surprised to see the opposite....
>> >
>> > Thanks a lot
>> >
>> > Benjamin
>>
>>
>
>

Re: Multiple consumers on a single server - strange behavior.

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
I’ll look further at this, but the first thing that I notice is that you are doing “work” in your Curator Listener. Please read Curator Tech Note 1:

	https://cwiki.apache.org/confluence/display/CURATOR/TN1

The quickest fix would be to do the getChildren() as a background operation. Alternatively, you can pass in a thread pool when registering the listener.

-Jordan

On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:

> First at all , thank you for your answer.
> 
> Here is the simple code, I used:
> 
> The producer and queueconsummer are given in the class 
> 
> Every 5 minutes, I am printing the the number of processed items, and I see some drastic differences between the different consumers:
> 
> 
> 
> Producer:
> =-=-=-=-=
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> public class QueueProducer implements Closeable {
>     
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     protected static final String PATH = "/test_queue";
> 
>     protected static final String LOCK_PATH = "/test_lock_queue";
>     
>     private DistributedQueue<CrawlUrl> queue;
>     
>     private static final int QUEUE_SIZE = 100000;
>     
>     private int items;
> 
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
> 
> 
>         // We register to the listener for monitoring the number of elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener() {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
> 
> 
>         while (true) {
>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }            
>                 
>             Thread.sleep(5000);
> 
>         }
>     }
> 
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>             }
>         }
>     }
>     
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
> 
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>     
>     
> 
> }
> 
> 
> 
> 
> Consumer 
> =-=-=-=-=-
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> 
> public class MyQueueConsumer implements Closeable{
>     
>     private DistributedQueue<CrawlUrl> queue;
> 
>     String name;
> 
>     String id;
> 
>     FileWriter timeCounter;
>     
>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     int numberOfProcessedURL;
> 
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>     
>     private class FileWriterThread extends Thread {
> 
>         public FileWriterThread() {
>             // empty ctor
>         }
> 
>         @Override
>         public void run() {
>             // We write the stats:
> 
>             try {
>                 while (true) {
>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>                 
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>     
>     
>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception {
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_" +id + "_timeCounter.txt"));
>         
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new QueueConsumer<CrawlUrl>() {
> 
>             @Override
>             public void stateChanged(CuratorFramework client, ConnectionState newState) {
>                 System.out.println(String.format("[%s] connection state changed to %s", id, newState));
>             }
> 
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
>                 } 
>             }
> 
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
>     
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>     
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>     
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>     
>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>     
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>             }
>     
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to close", t);
>                     }
>                 }
>             });
>             
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
> 
> 
> 
> 
> Main
> -=-=-
> 
> package com.zk;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
> 
> 
> 
> public class QueueTestMain {
> 
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>             
> 
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>             
>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>                 
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>                 }
>         
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers failed to close", t);
>                         }
>                     }
>                 });
>     
>                 
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
> 
>     }
> 
> }
> 
> 
> 
> 
> Example of output:
> 
> 
> 
> 
> 
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
> Can you produce a test that shows this? Anything else interesting in the log? Of course, there could be a bug.
> 
> -Jordan
> 
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
> 
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time (when the queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log "consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When looking at my Log, I see that most of the consumers are most of the time on an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of consumers, I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
> 
> 


Re: Multiple consumers on a single server - strange behavior.

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
More…

The call to getChildren() just to find out how many children there are is very expensive. Instead, call checkExists() on the parent path and use the Stat object returned. The Stat object has a getNumChildren() method.

-Jordan

On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:

> First at all , thank you for your answer.
> 
> Here is the simple code, I used:
> 
> The producer and queueconsummer are given in the class 
> 
> Every 5 minutes, I am printing the the number of processed items, and I see some drastic differences between the different consumers:
> 
> 
> 
> Producer:
> =-=-=-=-=
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> public class QueueProducer implements Closeable {
>     
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     protected static final String PATH = "/test_queue";
> 
>     protected static final String LOCK_PATH = "/test_lock_queue";
>     
>     private DistributedQueue<CrawlUrl> queue;
>     
>     private static final int QUEUE_SIZE = 100000;
>     
>     private int items;
> 
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
> 
> 
>         // We register to the listener for monitoring the number of elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener() {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
> 
> 
>         while (true) {
>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }            
>                 
>             Thread.sleep(5000);
> 
>         }
>     }
> 
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>             }
>         }
>     }
>     
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
> 
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>     
>     
> 
> }
> 
> 
> 
> 
> Consumer 
> =-=-=-=-=-
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> 
> public class MyQueueConsumer implements Closeable{
>     
>     private DistributedQueue<CrawlUrl> queue;
> 
>     String name;
> 
>     String id;
> 
>     FileWriter timeCounter;
>     
>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     int numberOfProcessedURL;
> 
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>     
>     private class FileWriterThread extends Thread {
> 
>         public FileWriterThread() {
>             // empty ctor
>         }
> 
>         @Override
>         public void run() {
>             // We write the stats:
> 
>             try {
>                 while (true) {
>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>                 
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>     
>     
>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception {
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_" +id + "_timeCounter.txt"));
>         
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new QueueConsumer<CrawlUrl>() {
> 
>             @Override
>             public void stateChanged(CuratorFramework client, ConnectionState newState) {
>                 System.out.println(String.format("[%s] connection state changed to %s", id, newState));
>             }
> 
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
>                 } 
>             }
> 
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
>     
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>     
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>     
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>     
>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>     
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>             }
>     
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to close", t);
>                     }
>                 }
>             });
>             
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
> 
> 
> 
> 
> Main
> -=-=-
> 
> package com.zk;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
> 
> 
> 
> public class QueueTestMain {
> 
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>             
> 
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>             
>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>                 
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>                 }
>         
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers failed to close", t);
>                         }
>                     }
>                 });
>     
>                 
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
> 
>     }
> 
> }
> 
> 
> 
> 
> Example of output:
> 
> 
> 
> 
> 
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
> Can you produce a test that shows this? Anything else interesting in the log? Of course, there could be a bug.
> 
> -Jordan
> 
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
> 
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time (when the queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log "consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When looking at my Log, I see that most of the consumers are most of the time on an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of consumers, I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
> 
> 


Re: Multiple consumers on a single server - strange behavior.

Posted by Sznajder ForMailingList <bs...@gmail.com>.
Hi Jordan

Following your advice, I moved the processing of the event outside of the
listener.

However, I do not catch any event ....

What did I write wrong?

Thanks a lot!

package com..hrl.zk;

import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com...crawler.CrawlUrl;
import com...crawler.Utils;
import com...main.CrawlerPropertyFile;

public class QueueProducer implements Closeable {

    final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);

    final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    DistributedQueue<CrawlUrl> queue;

    private static final int QUEUE_SIZE = 100;

    int items;

    int cnt;

    private class QueueAdder extends Thread {

        private int size;
        public QueueAdder(int size) {
            System.out.println("QueueAdder " + size);
            this.size = size;
        }

        @Override
        public void run() {
            LOG.info("QueueAdder ! " );
            for (int i = 0; i < this.size; i++ ) {
                try {
                    QueueProducer.this.queue.put(new
CrawlUrl(""+QueueProducer.this.items++));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            LOG.info("DONE!");
        }
    }

    public QueueProducer(CuratorFramework framework) throws Exception {
        LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");

System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
a QueueProducer");
        this.queue = Utils.newDistributedQueue(framework,
                Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
        this.queue.start();
        addQueueContent(QUEUE_SIZE);
        System.out.println("Done with the initial init");


        // We register to the listener for monitoring the number of elements
        // in the queue
        framework.getCuratorListenable().addListener(new CuratorListener() {
            @Override
            public void eventReceived(final CuratorFramework framework_,
                    CuratorEvent event) throws Exception {
                LOG.info("CNT " + QueueProducer.this.cnt);
                if (QueueProducer.this.cnt++ <= QUEUE_SIZE /2) {
                    QueueAdder queueAdder = new QueueAdder(QUEUE_SIZE -
QueueProducer.this.cnt );
                    queueAdder.start();
                }

            }
        });

    }



    void addQueueContent(int numberOfItems) {
        LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
numberOfItems);
        for (int i = 0; i < numberOfItems; i++) {
            try {
                CrawlUrl url = new CrawlUrl(""+this.items++);
                this.queue.put(url);
            } catch (Exception e) {
                LOG.error ("Caught an error when adding the item " + i + "
in the initQueueContent()");
            }
        }
    }



    @Override
    public void close() throws IOException {
        this.queue.close();
    }



}



On Mon, Nov 18, 2013 at 12:59 AM, Jordan Zimmerman <
jordan@jordanzimmerman.com> wrote:

> I don’t think there’s any reason to assume that each consumer will process
> an equal number of messages.
>
> -JZ
>
> On Nov 17, 2013, at 2:51 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
> Hi Jordan..
>
> Regarding the output:
>
> As you can see the LOG prints the name of the consumer and "processed "
> and the item....
>
> I am running the program with 4 servers in my chorum:
> ir-hadoop1 server is a producer
> ir-hadoop2--> ir-hadoop4 are consumers.
>
> After 14 minutes, I simply count the number of procssed items on each one
> of the consumer (a simplistic grep on the LOG file) and I get the folliwng:
>
> ir-hadoop2 : 3042 processed items
> ir-hadoop3 : 1276 processed items
> ir-hadoop4 : 830 processed items...
>
> If I have a look at the procssed times, I can see that ir-hadoop4 , is
> most of the time idle... I attach here the LOG corresponding to ir-hadoop4
> for example
>
> Benjamin
>
>
> On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman <
> jordan@jordanzimmerman.com> wrote:
>
>> The example out is missing. Please provide that too.
>>
>> -Jordan
>>
>> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <
>> bs4mailinglist@gmail.com> wrote:
>>
>> First at all , thank you for your answer.
>>
>> Here is the simple code, I used:
>>
>> The producer and queueconsummer are given in the class
>>
>> Every 5 minutes, I am printing the the number of processed items, and I
>> see some drastic differences between the different consumers:
>>
>>
>>
>> Producer:
>> =-=-=-=-=
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.api.CuratorEvent;
>> import org.apache.curator.framework.api.CuratorListener;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>> public class QueueProducer implements Closeable {
>>
>>     final static Logger LOG =
>> LoggerFactory.getLogger(QueueProducer.class);
>>
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>>     protected static final String PATH = "/test_queue";
>>
>>     protected static final String LOCK_PATH = "/test_lock_queue";
>>
>>     private DistributedQueue<CrawlUrl> queue;
>>
>>     private static final int QUEUE_SIZE = 100000;
>>
>>     private int items;
>>
>>     public QueueProducer(CuratorFramework framework) throws Exception {
>>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is
>> a QueueProducer");
>>
>> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
>> a QueueProducer");
>>         this.queue = Utils.newDistributedQueue(framework,
>>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH,
>> null);
>>         this.queue.start();
>>         addQueueContent(QUEUE_SIZE);
>>         System.out.println("Done with the initial init");
>>
>>
>>         // We register to the listener for monitoring the number of
>> elements
>>         // in the queue
>>         framework.getCuratorListenable().addListener(new
>> CuratorListener() {
>>             @Override
>>             public void eventReceived(final CuratorFramework framework_,
>>                     CuratorEvent event) throws Exception {
>>                 if (event.getPath() != null    &&
>> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>>                     // this also restores the notification
>>                     List<String> children = framework_.getChildren()
>>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>                     if (children.size() <= QUEUE_SIZE/2) {
>>                         addQueueContent(QUEUE_SIZE - children.size());
>>                     }
>>                 }
>>             }
>>         });
>>
>>
>>         while (true) {
>>             List<String> children =
>> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>             if (children.size() <= QUEUE_SIZE/2) {
>>                 LOG.info(dateFormat.format(new Date()) + " - In the
>> while(true) - We call for size " + children.size());
>>                 addQueueContent(QUEUE_SIZE - children.size());
>>             }
>>
>>             Thread.sleep(5000);
>>
>>         }
>>     }
>>
>>     void addQueueContent(int numberOfItems) {
>>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
>> numberOfItems);
>>         for (int i = 0; i < numberOfItems; i++) {
>>             try {
>>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>>                 this.queue.put(url);
>>             } catch (Exception e) {
>>                 LOG.error ("Caught an error when adding the item " + i +
>> " in the initQueueContent()");
>>             }
>>         }
>>     }
>>
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>>
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>
>>             final CuratorFramework framework =
>> Utils.newFramework(connectString);
>>             framework.start();
>>
>>             @SuppressWarnings("unused")
>>             QueueProducer producer = new QueueProducer(framework);
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>>
>>
>>
>> }
>>
>>
>>
>>
>> Consumer
>> =-=-=-=-=-
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.File;
>> import java.io.FileWriter;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.framework.recipes.queue.QueueConsumer;
>> import org.apache.curator.framework.state.ConnectionState;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>>
>> public class MyQueueConsumer implements Closeable{
>>
>>     private DistributedQueue<CrawlUrl> queue;
>>
>>     String name;
>>
>>     String id;
>>
>>     FileWriter timeCounter;
>>
>>     final static Logger LOG =
>> LoggerFactory.getLogger(MyQueueConsumer.class);
>>
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>>     int numberOfProcessedURL;
>>
>>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>>
>>     private class FileWriterThread extends Thread {
>>
>>         public FileWriterThread() {
>>             // empty ctor
>>         }
>>
>>         @Override
>>         public void run() {
>>             // We write the stats:
>>
>>             try {
>>                 while (true) {
>>
>> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>>
>> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>>                     MyQueueConsumer.this.timeCounter.flush();
>>
>>                     // Sleeps 5 minutes
>>                     Thread.sleep(300000);
>>                 }
>>             } catch (Exception e) {
>>                 // TODO Auto-generated catch block
>>                 e.printStackTrace();
>>             }
>>         }
>>     }
>>
>>
>>     public MyQueueConsumer(CuratorFramework framework, final String id)
>> throws Exception {
>>         this.id = id;
>>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
>> this.name + "_" +id + "_timeCounter.txt"));
>>
>> //        this.timeCounterThread = new FileWriterThread();
>> //        this.timeCounterThread.start();
>>         this.queue = Utils.newDistributedQueue(framework,
>> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
>> QueueConsumer<CrawlUrl>() {
>>
>>             @Override
>>             public void stateChanged(CuratorFramework client,
>> ConnectionState newState) {
>>                 System.out.println(String.format("[%s] connection state
>> changed to %s", id, newState));
>>             }
>>
>>             @Override
>>             public void consumeMessage(CrawlUrl url) throws Exception {
>>                 try {
>>                     LOG.info(dateFormat.format(new
>> Date(System.currentTimeMillis())) + "["+id+ "-" +
>> MyQueueConsumer.this.name <http://myqueueconsumer.this.name/>+ "]
>> processed " + url.url);
>>                     MyQueueConsumer.this.numberOfProcessedURL++;
>>                 } catch (Exception e) {
>>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name<http://myqueueconsumer.this.name/>+
>> "]" + e.getMessage() + " for url " + url.url );
>>                 }
>>             }
>>
>>         });
>>         try {
>>             this.queue.start();
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>>     public static void main(String[] args) {
>>         try {
>>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>>
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>
>>             final CuratorFramework framework =
>> Utils.newFramework(connectString);
>>             framework.start();
>>
>>             final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>>             for (int i = 0; i < queueConsumers.length; i++) {
>>                 queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>>             }
>>
>>             Runtime.getRuntime().addShutdownHook(new Thread() {
>>                 @Override
>>                 public void run() {
>>                     // close workers
>>                     Throwable t = null;
>>                     LOG.info("We close the workers");
>>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>>                         try {
>>                             queueConsumer.close();
>>                         } catch (Throwable th) {
>>                             if (t == null) {
>>                                 t = th;
>>                             }
>>                         }
>>                     }
>>                     // throw first exception that we encountered
>>                     if (t != null) {
>>                         throw new RuntimeException("some workers failed
>> to close", t);
>>                     }
>>                 }
>>             });
>>
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>> }
>>
>>
>>
>>
>> Main
>> -=-=-
>>
>> package com.zk;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.test.TestingServer;
>>
>>
>>
>> public class QueueTestMain {
>>
>>     /**
>>      * @param args
>>      */
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>>
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>
>>             final CuratorFramework framework =
>> Utils.newFramework(connectString);
>>             framework.start();
>>
>>
>>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>>                 @SuppressWarnings("unused")
>>                 QueueProducer producer = new QueueProducer(framework);
>>             } else {
>>
>>                 final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>>                 for (int i = 0; i < queueConsumers.length; i++) {
>>                     queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>>                 }
>>
>>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>>                     @Override
>>                     public void run() {
>>                         // close workers
>>                         Throwable t = null;
>>                         for (MyQueueConsumer queueConsumer :
>> queueConsumers) {
>>                             try {
>>                                 queueConsumer.close();
>>                             } catch (Throwable th) {
>>                                 if (t == null) {
>>                                     t = th;
>>                                 }
>>                             }
>>                         }
>>                         // throw first exception that we encountered
>>                         if (t != null) {
>>                             throw new RuntimeException("some workers
>> failed to close", t);
>>                         }
>>                     }
>>                 });
>>
>>
>>             }
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>> }
>>
>>
>>
>>
>> Example of output:
>>
>>
>>
>>
>>
>> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
>> jordan@jordanzimmerman.com> wrote:
>>
>>> Can you produce a test that shows this? Anything else interesting in the
>>> log? Of course, there could be a bug.
>>>
>>> -Jordan
>>>
>>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
>>> bs4mailinglist@gmail.com> wrote:
>>>
>>> > Hi
>>> >
>>> > I made a short test as following:
>>> >
>>> > - I have a chorum of 3 nodes for Zookeeper.
>>> > - I wrote a class using Curator QueueProducer who produces all the
>>> time (when the queue is 10% full, it creates new items) , items (random
>>> integer)
>>> > - I wrote a simple class using Curator Queue Consumer which simply
>>> prints to Log "consumed item i".
>>> >
>>> > I tested some different combinations :
>>> > - running the consumers on one, two or three nodes.
>>> > - running one or more consumers in parallel on a given node.
>>> >
>>> >
>>> > But, and here is my question: I see some very strange behavior when I
>>> have several consummers in parallel on a node. For example, running 5
>>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>>> looking at my Log, I see that most of the consumers are most of the time on
>>> an idle state....
>>> >
>>> > Do I mistake somewhere?
>>> >  I was expecting to enhance the throughput by augmenting the number of
>>> consumers, I am surprised to see the opposite....
>>> >
>>> > Thanks a lot
>>> >
>>> > Benjamin
>>>
>>>
>>
>>
> <ir-hadoop4_log.txt>
>
>
>

Re: Multiple consumers on a single server - strange behavior.

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
I don’t think there’s any reason to assume that each consumer will process an equal number of messages. 

-JZ

On Nov 17, 2013, at 2:51 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:

> Hi Jordan..
> 
> Regarding the output:
> 
> As you can see the LOG prints the name of the consumer and "processed " and the item....
> 
> I am running the program with 4 servers in my chorum:
> ir-hadoop1 server is a producer
> ir-hadoop2--> ir-hadoop4 are consumers.
> 
> After 14 minutes, I simply count the number of procssed items on each one of the consumer (a simplistic grep on the LOG file) and I get the folliwng:
> 
> ir-hadoop2 : 3042 processed items
> ir-hadoop3 : 1276 processed items
> ir-hadoop4 : 830 processed items...
> 
> If I have a look at the procssed times, I can see that ir-hadoop4 , is most of the time idle... I attach here the LOG corresponding to ir-hadoop4 for example
> 
> Benjamin
> 
> 
> On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
> The example out is missing. Please provide that too.
> 
> -Jordan
> 
> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
> 
>> First at all , thank you for your answer.
>> 
>> Here is the simple code, I used:
>> 
>> The producer and queueconsummer are given in the class 
>> 
>> Every 5 minutes, I am printing the the number of processed items, and I see some drastic differences between the different consumers:
>> 
>> 
>> 
>> Producer:
>> =-=-=-=-=
>> 
>> package com.zk;
>> 
>> import java.io.Closeable;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>> 
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.api.CuratorEvent;
>> import org.apache.curator.framework.api.CuratorListener;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> 
>> public class QueueProducer implements Closeable {
>>     
>>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>>     
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>     
>>     protected static final String PATH = "/test_queue";
>> 
>>     protected static final String LOCK_PATH = "/test_lock_queue";
>>     
>>     private DistributedQueue<CrawlUrl> queue;
>>     
>>     private static final int QUEUE_SIZE = 100000;
>>     
>>     private int items;
>> 
>>     public QueueProducer(CuratorFramework framework) throws Exception {
>>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>>         this.queue = Utils.newDistributedQueue(framework,
>>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>>         this.queue.start();
>>         addQueueContent(QUEUE_SIZE);
>>         System.out.println("Done with the initial init");
>> 
>> 
>>         // We register to the listener for monitoring the number of elements
>>         // in the queue
>>         framework.getCuratorListenable().addListener(new CuratorListener() {
>>             @Override
>>             public void eventReceived(final CuratorFramework framework_,
>>                     CuratorEvent event) throws Exception {
>>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>>                     // this also restores the notification
>>                     List<String> children = framework_.getChildren()
>>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>                     if (children.size() <= QUEUE_SIZE/2) {
>>                         addQueueContent(QUEUE_SIZE - children.size());
>>                     }
>>                 }
>>             }
>>         });
>> 
>> 
>>         while (true) {
>>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>>             if (children.size() <= QUEUE_SIZE/2) {
>>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We call for size " + children.size());
>>                 addQueueContent(QUEUE_SIZE - children.size());
>>             }            
>>                 
>>             Thread.sleep(5000);
>> 
>>         }
>>     }
>> 
>>     void addQueueContent(int numberOfItems) {
>>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>>         for (int i = 0; i < numberOfItems; i++) {
>>             try {
>>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>>                 this.queue.put(url);
>>             } catch (Exception e) {
>>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>>             }
>>         }
>>     }
>>     
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>> 
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>> 
>>             final CuratorFramework framework = Utils.newFramework(connectString);
>>             framework.start();
>> 
>>             @SuppressWarnings("unused")
>>             QueueProducer producer = new QueueProducer(framework);
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>> 
>>     }
>> 
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>>     
>>     
>> 
>> }
>> 
>> 
>> 
>> 
>> Consumer 
>> =-=-=-=-=-
>> 
>> package com.zk;
>> 
>> import java.io.Closeable;
>> import java.io.File;
>> import java.io.FileWriter;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> 
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.framework.recipes.queue.QueueConsumer;
>> import org.apache.curator.framework.state.ConnectionState;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> 
>> 
>> public class MyQueueConsumer implements Closeable{
>>     
>>     private DistributedQueue<CrawlUrl> queue;
>> 
>>     String name;
>> 
>>     String id;
>> 
>>     FileWriter timeCounter;
>>     
>>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>>     
>>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>     
>>     int numberOfProcessedURL;
>> 
>>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>>     
>>     private class FileWriterThread extends Thread {
>> 
>>         public FileWriterThread() {
>>             // empty ctor
>>         }
>> 
>>         @Override
>>         public void run() {
>>             // We write the stats:
>> 
>>             try {
>>                 while (true) {
>>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>>                     MyQueueConsumer.this.timeCounter.flush();
>>                 
>>                     // Sleeps 5 minutes
>>                     Thread.sleep(300000);
>>                 }
>>             } catch (Exception e) {
>>                 // TODO Auto-generated catch block
>>                 e.printStackTrace();
>>             }
>>         }
>>     }
>>     
>>     
>>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception {
>>         this.id = id;
>>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_" +id + "_timeCounter.txt"));
>>         
>> //        this.timeCounterThread = new FileWriterThread();
>> //        this.timeCounterThread.start();
>>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new QueueConsumer<CrawlUrl>() {
>> 
>>             @Override
>>             public void stateChanged(CuratorFramework client, ConnectionState newState) {
>>                 System.out.println(String.format("[%s] connection state changed to %s", id, newState));
>>             }
>> 
>>             @Override
>>             public void consumeMessage(CrawlUrl url) throws Exception {
>>                 try {
>>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>>                     MyQueueConsumer.this.numberOfProcessedURL++;
>>                 } catch (Exception e) {
>>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
>>                 } 
>>             }
>> 
>>         });
>>         try {
>>             this.queue.start();
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>> 
>>     }
>>     
>>     public static void main(String[] args) {
>>         try {
>>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>>     
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>>     
>>             final CuratorFramework framework = Utils.newFramework(connectString);
>>             framework.start();
>>     
>>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>>     
>>             for (int i = 0; i < queueConsumers.length; i++) {
>>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>>             }
>>     
>>             Runtime.getRuntime().addShutdownHook(new Thread() {
>>                 @Override
>>                 public void run() {
>>                     // close workers
>>                     Throwable t = null;
>>                     LOG.info("We close the workers");
>>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>>                         try {
>>                             queueConsumer.close();
>>                         } catch (Throwable th) {
>>                             if (t == null) {
>>                                 t = th;
>>                             }
>>                         }
>>                     }
>>                     // throw first exception that we encountered
>>                     if (t != null) {
>>                         throw new RuntimeException("some workers failed to close", t);
>>                     }
>>                 }
>>             });
>>             
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     @Override
>>     public void close() throws IOException {
>>         this.queue.close();
>>     }
>> }
>> 
>> 
>> 
>> 
>> Main
>> -=-=-
>> 
>> package com.zk;
>> 
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.test.TestingServer;
>> 
>> 
>> 
>> public class QueueTestMain {
>> 
>>     /**
>>      * @param args
>>      */
>>     public static void main(String[] args) {
>>         CrawlerPropertyFile props;
>>         try {
>>             props = new CrawlerPropertyFile(args[0]);
>> 
>>             final String connectString;
>>             System.out.println("DEBUG = " + Utils.DEBUG);
>>             if (props.useZkTestServer()) {
>>                 System.out.println("Will launch from zkTestServer");
>>                 TestingServer server = new TestingServer();
>>                 connectString = server.getConnectString();
>>             } else {
>>                 connectString = props.getZkServer();
>>             }
>> 
>>             final CuratorFramework framework = Utils.newFramework(connectString);
>>             framework.start();
>>             
>> 
>>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>>                 @SuppressWarnings("unused")
>>                 QueueProducer producer = new QueueProducer(framework);
>>             } else {
>>             
>>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>>                 
>>                 for (int i = 0; i < queueConsumers.length; i++) {
>>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>>                 }
>>         
>>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>>                     @Override
>>                     public void run() {
>>                         // close workers
>>                         Throwable t = null;
>>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>>                             try {
>>                                 queueConsumer.close();
>>                             } catch (Throwable th) {
>>                                 if (t == null) {
>>                                     t = th;
>>                                 }
>>                             }
>>                         }
>>                         // throw first exception that we encountered
>>                         if (t != null) {
>>                             throw new RuntimeException("some workers failed to close", t);
>>                         }
>>                     }
>>                 });
>>     
>>                 
>>             }
>>         }catch (Exception e ){
>>             e.printStackTrace();
>>         }
>> 
>>     }
>> 
>> }
>> 
>> 
>> 
>> 
>> Example of output:
>> 
>> 
>> 
>> 
>> 
>> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
>> Can you produce a test that shows this? Anything else interesting in the log? Of course, there could be a bug.
>> 
>> -Jordan
>> 
>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
>> 
>> > Hi
>> >
>> > I made a short test as following:
>> >
>> > - I have a chorum of 3 nodes for Zookeeper.
>> > - I wrote a class using Curator QueueProducer who produces all the time (when the queue is 10% full, it creates new items) , items (random integer)
>> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log "consumed item i".
>> >
>> > I tested some different combinations :
>> > - running the consumers on one, two or three nodes.
>> > - running one or more consumers in parallel on a given node.
>> >
>> >
>> > But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When looking at my Log, I see that most of the consumers are most of the time on an idle state....
>> >
>> > Do I mistake somewhere?
>> >  I was expecting to enhance the throughput by augmenting the number of consumers, I am surprised to see the opposite....
>> >
>> > Thanks a lot
>> >
>> > Benjamin
>> 
>> 
> 
> 
> <ir-hadoop4_log.txt>


Re: Multiple consumers on a single server - strange behavior.

Posted by Sznajder ForMailingList <bs...@gmail.com>.
Hi Jordan..

Regarding the output:

As you can see the LOG prints the name of the consumer and "processed " and
the item....

I am running the program with 4 servers in my chorum:
ir-hadoop1 server is a producer
ir-hadoop2--> ir-hadoop4 are consumers.

After 14 minutes, I simply count the number of procssed items on each one
of the consumer (a simplistic grep on the LOG file) and I get the folliwng:

ir-hadoop2 : 3042 processed items
ir-hadoop3 : 1276 processed items
ir-hadoop4 : 830 processed items...

If I have a look at the procssed times, I can see that ir-hadoop4 , is most
of the time idle... I attach here the LOG corresponding to ir-hadoop4 for
example

Benjamin


On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman <
jordan@jordanzimmerman.com> wrote:

> The example out is missing. Please provide that too.
>
> -Jordan
>
> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
> First at all , thank you for your answer.
>
> Here is the simple code, I used:
>
> The producer and queueconsummer are given in the class
>
> Every 5 minutes, I am printing the the number of processed items, and I
> see some drastic differences between the different consumers:
>
>
>
> Producer:
> =-=-=-=-=
>
> package com.zk;
>
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
> public class QueueProducer implements Closeable {
>
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>
>     protected static final String PATH = "/test_queue";
>
>     protected static final String LOCK_PATH = "/test_lock_queue";
>
>     private DistributedQueue<CrawlUrl> queue;
>
>     private static final int QUEUE_SIZE = 100000;
>
>     private int items;
>
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is
> a QueueProducer");
>
> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
> a QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
>
>
>         // We register to the listener for monitoring the number of
> elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener()
> {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    &&
> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
>
>
>         while (true) {
>             List<String> children =
> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the
> while(true) - We call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }
>
>             Thread.sleep(5000);
>
>         }
>     }
>
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
> numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + "
> in the initQueueContent()");
>             }
>         }
>     }
>
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
>
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>
>             final CuratorFramework framework =
> Utils.newFramework(connectString);
>             framework.start();
>
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>
>
>
> }
>
>
>
>
> Consumer
> =-=-=-=-=-
>
> package com.zk;
>
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
>
> public class MyQueueConsumer implements Closeable{
>
>     private DistributedQueue<CrawlUrl> queue;
>
>     String name;
>
>     String id;
>
>     FileWriter timeCounter;
>
>     final static Logger LOG =
> LoggerFactory.getLogger(MyQueueConsumer.class);
>
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>
>     int numberOfProcessedURL;
>
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>
>     private class FileWriterThread extends Thread {
>
>         public FileWriterThread() {
>             // empty ctor
>         }
>
>         @Override
>         public void run() {
>             // We write the stats:
>
>             try {
>                 while (true) {
>
> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>
> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>
>
>     public MyQueueConsumer(CuratorFramework framework, final String id)
> throws Exception {
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
> this.name + "_" +id + "_timeCounter.txt"));
>
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework,
> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
> QueueConsumer<CrawlUrl>() {
>
>             @Override
>             public void stateChanged(CuratorFramework client,
> ConnectionState newState) {
>                 System.out.println(String.format("[%s] connection state
> changed to %s", id, newState));
>             }
>
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new
> Date(System.currentTimeMillis())) + "["+id+ "-" +
> MyQueueConsumer.this.name <http://myqueueconsumer.this.name/>+ "]
> processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name<http://myqueueconsumer.this.name/>+
> "]" + e.getMessage() + " for url " + url.url );
>                 }
>             }
>
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>
>             final CuratorFramework framework =
> Utils.newFramework(connectString);
>             framework.start();
>
>             final MyQueueConsumer[] queueConsumers = new
> MyQueueConsumer[props.getNumberOfWorkers()];
>
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework,
> "id_"+i);
>             }
>
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to
> close", t);
>                     }
>                 }
>             });
>
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
>
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
>
>
>
>
> Main
> -=-=-
>
> package com.zk;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
>
>
>
> public class QueueTestMain {
>
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
>
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>
>             final CuratorFramework framework =
> Utils.newFramework(connectString);
>             framework.start();
>
>
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>
>                 final MyQueueConsumer[] queueConsumers = new
> MyQueueConsumer[props.getNumberOfWorkers()];
>
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework,
> "id_"+i);
>                 }
>
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer :
> queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers
> failed to close", t);
>                         }
>                     }
>                 });
>
>
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>
>     }
>
> }
>
>
>
>
> Example of output:
>
>
>
>
>
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
> jordan@jordanzimmerman.com> wrote:
>
>> Can you produce a test that shows this? Anything else interesting in the
>> log? Of course, there could be a bug.
>>
>> -Jordan
>>
>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
>> bs4mailinglist@gmail.com> wrote:
>>
>> > Hi
>> >
>> > I made a short test as following:
>> >
>> > - I have a chorum of 3 nodes for Zookeeper.
>> > - I wrote a class using Curator QueueProducer who produces all the time
>> (when the queue is 10% full, it creates new items) , items (random integer)
>> > - I wrote a simple class using Curator Queue Consumer which simply
>> prints to Log "consumed item i".
>> >
>> > I tested some different combinations :
>> > - running the consumers on one, two or three nodes.
>> > - running one or more consumers in parallel on a given node.
>> >
>> >
>> > But, and here is my question: I see some very strange behavior when I
>> have several consummers in parallel on a node. For example, running 5
>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>> looking at my Log, I see that most of the consumers are most of the time on
>> an idle state....
>> >
>> > Do I mistake somewhere?
>> >  I was expecting to enhance the throughput by augmenting the number of
>> consumers, I am surprised to see the opposite....
>> >
>> > Thanks a lot
>> >
>> > Benjamin
>>
>>
>
>

Re: Multiple consumers on a single server - strange behavior.

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
The example out is missing. Please provide that too.

-Jordan

On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:

> First at all , thank you for your answer.
> 
> Here is the simple code, I used:
> 
> The producer and queueconsummer are given in the class 
> 
> Every 5 minutes, I am printing the the number of processed items, and I see some drastic differences between the different consumers:
> 
> 
> 
> Producer:
> =-=-=-=-=
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> public class QueueProducer implements Closeable {
>     
>     final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     protected static final String PATH = "/test_queue";
> 
>     protected static final String LOCK_PATH = "/test_lock_queue";
>     
>     private DistributedQueue<CrawlUrl> queue;
>     
>     private static final int QUEUE_SIZE = 100000;
>     
>     private int items;
> 
>     public QueueProducer(CuratorFramework framework) throws Exception {
>         LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
>         this.queue = Utils.newDistributedQueue(framework,
>                 Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>         this.queue.start();
>         addQueueContent(QUEUE_SIZE);
>         System.out.println("Done with the initial init");
> 
> 
>         // We register to the listener for monitoring the number of elements
>         // in the queue
>         framework.getCuratorListenable().addListener(new CuratorListener() {
>             @Override
>             public void eventReceived(final CuratorFramework framework_,
>                     CuratorEvent event) throws Exception {
>                 if (event.getPath() != null    && event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>                     // this also restores the notification
>                     List<String> children = framework_.getChildren()
>                             .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>                     if (children.size() <= QUEUE_SIZE/2) {
>                         addQueueContent(QUEUE_SIZE - children.size());
>                     }
>                 }
>             }
>         });
> 
> 
>         while (true) {
>             List<String> children = framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>             if (children.size() <= QUEUE_SIZE/2) {
>                 LOG.info(dateFormat.format(new Date()) + " - In the while(true) - We call for size " + children.size());
>                 addQueueContent(QUEUE_SIZE - children.size());
>             }            
>                 
>             Thread.sleep(5000);
> 
>         }
>     }
> 
>     void addQueueContent(int numberOfItems) {
>         LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems);
>         for (int i = 0; i < numberOfItems; i++) {
>             try {
>                 CrawlUrl url = new CrawlUrl(""+this.items++);
>                 this.queue.put(url);
>             } catch (Exception e) {
>                 LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
>             }
>         }
>     }
>     
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
> 
>             @SuppressWarnings("unused")
>             QueueProducer producer = new QueueProducer(framework);
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
>     
>     
> 
> }
> 
> 
> 
> 
> Consumer 
> =-=-=-=-=-
> 
> package com.zk;
> 
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> 
> 
> public class MyQueueConsumer implements Closeable{
>     
>     private DistributedQueue<CrawlUrl> queue;
> 
>     String name;
> 
>     String id;
> 
>     FileWriter timeCounter;
>     
>     final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>     
>     final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>     
>     int numberOfProcessedURL;
> 
>     private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>     
>     private class FileWriterThread extends Thread {
> 
>         public FileWriterThread() {
>             // empty ctor
>         }
> 
>         @Override
>         public void run() {
>             // We write the stats:
> 
>             try {
>                 while (true) {
>                     MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>                             "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
>                     MyQueueConsumer.this.timeCounter.flush();
>                 
>                     // Sleeps 5 minutes
>                     Thread.sleep(300000);
>                 }
>             } catch (Exception e) {
>                 // TODO Auto-generated catch block
>                 e.printStackTrace();
>             }
>         }
>     }
>     
>     
>     public MyQueueConsumer(CuratorFramework framework, final String id) throws Exception {
>         this.id = id;
>         this.name = java.net.InetAddress.getLocalHost().getHostName();
>         this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ this.name + "_" +id + "_timeCounter.txt"));
>         
> //        this.timeCounterThread = new FileWriterThread();
> //        this.timeCounterThread.start();
>         this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new QueueConsumer<CrawlUrl>() {
> 
>             @Override
>             public void stateChanged(CuratorFramework client, ConnectionState newState) {
>                 System.out.println(String.format("[%s] connection state changed to %s", id, newState));
>             }
> 
>             @Override
>             public void consumeMessage(CrawlUrl url) throws Exception {
>                 try {
>                     LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ "] processed " + url.url);
>                     MyQueueConsumer.this.numberOfProcessedURL++;
>                 } catch (Exception e) {
>                     LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
>                 } 
>             }
> 
>         });
>         try {
>             this.queue.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
>     
>     public static void main(String[] args) {
>         try {
>             CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>     
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
>     
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>     
>             final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>     
>             for (int i = 0; i < queueConsumers.length; i++) {
>                 queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>             }
>     
>             Runtime.getRuntime().addShutdownHook(new Thread() {
>                 @Override
>                 public void run() {
>                     // close workers
>                     Throwable t = null;
>                     LOG.info("We close the workers");
>                     for (MyQueueConsumer queueConsumer : queueConsumers) {
>                         try {
>                             queueConsumer.close();
>                         } catch (Throwable th) {
>                             if (t == null) {
>                                 t = th;
>                             }
>                         }
>                     }
>                     // throw first exception that we encountered
>                     if (t != null) {
>                         throw new RuntimeException("some workers failed to close", t);
>                     }
>                 }
>             });
>             
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void close() throws IOException {
>         this.queue.close();
>     }
> }
> 
> 
> 
> 
> Main
> -=-=-
> 
> package com.zk;
> 
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
> 
> 
> 
> public class QueueTestMain {
> 
>     /**
>      * @param args
>      */
>     public static void main(String[] args) {
>         CrawlerPropertyFile props;
>         try {
>             props = new CrawlerPropertyFile(args[0]);
> 
>             final String connectString;
>             System.out.println("DEBUG = " + Utils.DEBUG);
>             if (props.useZkTestServer()) {
>                 System.out.println("Will launch from zkTestServer");
>                 TestingServer server = new TestingServer();
>                 connectString = server.getConnectString();
>             } else {
>                 connectString = props.getZkServer();
>             }
> 
>             final CuratorFramework framework = Utils.newFramework(connectString);
>             framework.start();
>             
> 
>             if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>                 @SuppressWarnings("unused")
>                 QueueProducer producer = new QueueProducer(framework);
>             } else {
>             
>                 final MyQueueConsumer[] queueConsumers = new MyQueueConsumer[props.getNumberOfWorkers()];
>                 
>                 for (int i = 0; i < queueConsumers.length; i++) {
>                     queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>                 }
>         
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                     @Override
>                     public void run() {
>                         // close workers
>                         Throwable t = null;
>                         for (MyQueueConsumer queueConsumer : queueConsumers) {
>                             try {
>                                 queueConsumer.close();
>                             } catch (Throwable th) {
>                                 if (t == null) {
>                                     t = th;
>                                 }
>                             }
>                         }
>                         // throw first exception that we encountered
>                         if (t != null) {
>                             throw new RuntimeException("some workers failed to close", t);
>                         }
>                     }
>                 });
>     
>                 
>             }
>         }catch (Exception e ){
>             e.printStackTrace();
>         }
> 
>     }
> 
> }
> 
> 
> 
> 
> Example of output:
> 
> 
> 
> 
> 
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <jo...@jordanzimmerman.com> wrote:
> Can you produce a test that shows this? Anything else interesting in the log? Of course, there could be a bug.
> 
> -Jordan
> 
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:
> 
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time (when the queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply prints to Log "consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When looking at my Log, I see that most of the consumers are most of the time on an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of consumers, I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
> 
> 


Re: Multiple consumers on a single server - strange behavior.

Posted by Sznajder ForMailingList <bs...@gmail.com>.
First at all , thank you for your answer.

Here is the simple code, I used:

The producer and queueconsummer are given in the class

Every 5 minutes, I am printing the the number of processed items, and I see
some drastic differences between the different consumers:



Producer:
=-=-=-=-=

package com.zk;

import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class QueueProducer implements Closeable {

    final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);

    final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    protected static final String PATH = "/test_queue";

    protected static final String LOCK_PATH = "/test_lock_queue";

    private DistributedQueue<CrawlUrl> queue;

    private static final int QUEUE_SIZE = 100000;

    private int items;

    public QueueProducer(CuratorFramework framework) throws Exception {
        LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");

System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
a QueueProducer");
        this.queue = Utils.newDistributedQueue(framework,
                Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
        this.queue.start();
        addQueueContent(QUEUE_SIZE);
        System.out.println("Done with the initial init");


        // We register to the listener for monitoring the number of elements
        // in the queue
        framework.getCuratorListenable().addListener(new CuratorListener() {
            @Override
            public void eventReceived(final CuratorFramework framework_,
                    CuratorEvent event) throws Exception {
                if (event.getPath() != null    &&
event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
                    // this also restores the notification
                    List<String> children = framework_.getChildren()
                            .watched().forPath(Utils.CRAWL_QUEUE_PATH);
                    if (children.size() <= QUEUE_SIZE/2) {
                        addQueueContent(QUEUE_SIZE - children.size());
                    }
                }
            }
        });


        while (true) {
            List<String> children =
framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
            if (children.size() <= QUEUE_SIZE/2) {
                LOG.info(dateFormat.format(new Date()) + " - In the
while(true) - We call for size " + children.size());
                addQueueContent(QUEUE_SIZE - children.size());
            }

            Thread.sleep(5000);

        }
    }

    void addQueueContent(int numberOfItems) {
        LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
numberOfItems);
        for (int i = 0; i < numberOfItems; i++) {
            try {
                CrawlUrl url = new CrawlUrl(""+this.items++);
                this.queue.put(url);
            } catch (Exception e) {
                LOG.error ("Caught an error when adding the item " + i + "
in the initQueueContent()");
            }
        }
    }

    public static void main(String[] args) {
        CrawlerPropertyFile props;
        try {
            props = new CrawlerPropertyFile(args[0]);

            final String connectString;
            System.out.println("DEBUG = " + Utils.DEBUG);
            if (props.useZkTestServer()) {
                System.out.println("Will launch from zkTestServer");
                TestingServer server = new TestingServer();
                connectString = server.getConnectString();
            } else {
                connectString = props.getZkServer();
            }

            final CuratorFramework framework =
Utils.newFramework(connectString);
            framework.start();

            @SuppressWarnings("unused")
            QueueProducer producer = new QueueProducer(framework);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void close() throws IOException {
        this.queue.close();
    }



}




Consumer
=-=-=-=-=-

package com.zk;

import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class MyQueueConsumer implements Closeable{

    private DistributedQueue<CrawlUrl> queue;

    String name;

    String id;

    FileWriter timeCounter;

    final static Logger LOG =
LoggerFactory.getLogger(MyQueueConsumer.class);

    final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

    int numberOfProcessedURL;

    private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;

    private class FileWriterThread extends Thread {

        public FileWriterThread() {
            // empty ctor
        }

        @Override
        public void run() {
            // We write the stats:

            try {
                while (true) {

MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+

"[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL    +"]\n") ;
                    MyQueueConsumer.this.timeCounter.flush();

                    // Sleeps 5 minutes
                    Thread.sleep(300000);
                }
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }


    public MyQueueConsumer(CuratorFramework framework, final String id)
throws Exception {
        this.id = id;
        this.name = java.net.InetAddress.getLocalHost().getHostName();
        this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
this.name + "_" +id + "_timeCounter.txt"));

//        this.timeCounterThread = new FileWriterThread();
//        this.timeCounterThread.start();
        this.queue = Utils.newDistributedQueue(framework,
Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
QueueConsumer<CrawlUrl>() {

            @Override
            public void stateChanged(CuratorFramework client,
ConnectionState newState) {
                System.out.println(String.format("[%s] connection state
changed to %s", id, newState));
            }

            @Override
            public void consumeMessage(CrawlUrl url) throws Exception {
                try {
                    LOG.info(dateFormat.format(new
Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
"] processed " + url.url);
                    MyQueueConsumer.this.numberOfProcessedURL++;
                } catch (Exception e) {
                    LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]"
+ e.getMessage() + " for url " + url.url );
                }
            }

        });
        try {
            this.queue.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        try {
            CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);

            final String connectString;
            System.out.println("DEBUG = " + Utils.DEBUG);
            if (props.useZkTestServer()) {
                System.out.println("Will launch from zkTestServer");
                TestingServer server = new TestingServer();
                connectString = server.getConnectString();
            } else {
                connectString = props.getZkServer();
            }

            final CuratorFramework framework =
Utils.newFramework(connectString);
            framework.start();

            final MyQueueConsumer[] queueConsumers = new
MyQueueConsumer[props.getNumberOfWorkers()];

            for (int i = 0; i < queueConsumers.length; i++) {
                queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
            }

            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    // close workers
                    Throwable t = null;
                    LOG.info("We close the workers");
                    for (MyQueueConsumer queueConsumer : queueConsumers) {
                        try {
                            queueConsumer.close();
                        } catch (Throwable th) {
                            if (t == null) {
                                t = th;
                            }
                        }
                    }
                    // throw first exception that we encountered
                    if (t != null) {
                        throw new RuntimeException("some workers failed to
close", t);
                    }
                }
            });

        }catch (Exception e ){
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws IOException {
        this.queue.close();
    }
}




Main
-=-=-

package com.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;



public class QueueTestMain {

    /**
     * @param args
     */
    public static void main(String[] args) {
        CrawlerPropertyFile props;
        try {
            props = new CrawlerPropertyFile(args[0]);

            final String connectString;
            System.out.println("DEBUG = " + Utils.DEBUG);
            if (props.useZkTestServer()) {
                System.out.println("Will launch from zkTestServer");
                TestingServer server = new TestingServer();
                connectString = server.getConnectString();
            } else {
                connectString = props.getZkServer();
            }

            final CuratorFramework framework =
Utils.newFramework(connectString);
            framework.start();


            if (args[1] != null && args[1].equalsIgnoreCase("true")) {
                @SuppressWarnings("unused")
                QueueProducer producer = new QueueProducer(framework);
            } else {

                final MyQueueConsumer[] queueConsumers = new
MyQueueConsumer[props.getNumberOfWorkers()];

                for (int i = 0; i < queueConsumers.length; i++) {
                    queueConsumers[i] = new MyQueueConsumer(framework,
"id_"+i);
                }

                Runtime.getRuntime().addShutdownHook(new Thread() {
                    @Override
                    public void run() {
                        // close workers
                        Throwable t = null;
                        for (MyQueueConsumer queueConsumer :
queueConsumers) {
                            try {
                                queueConsumer.close();
                            } catch (Throwable th) {
                                if (t == null) {
                                    t = th;
                                }
                            }
                        }
                        // throw first exception that we encountered
                        if (t != null) {
                            throw new RuntimeException("some workers failed
to close", t);
                        }
                    }
                });


            }
        }catch (Exception e ){
            e.printStackTrace();
        }

    }

}




Example of output:





On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
jordan@jordanzimmerman.com> wrote:

> Can you produce a test that shows this? Anything else interesting in the
> log? Of course, there could be a bug.
>
> -Jordan
>
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
> bs4mailinglist@gmail.com> wrote:
>
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time
> (when the queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply
> prints to Log "consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I
> have several consummers in parallel on a node. For example, running 5
> consumers per node on 3 nodes, I see a throughput **very** slow. When
> looking at my Log, I see that most of the consumers are most of the time on
> an idle state....
> >
> > Do I mistake somewhere?
> >  I was expecting to enhance the throughput by augmenting the number of
> consumers, I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
>
>

Re: Multiple consumers on a single server - strange behavior.

Posted by Jordan Zimmerman <jo...@jordanzimmerman.com>.
Can you produce a test that shows this? Anything else interesting in the log? Of course, there could be a bug.

-Jordan

On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <bs...@gmail.com> wrote:

> Hi
> 
> I made a short test as following:
> 
> - I have a chorum of 3 nodes for Zookeeper.
> - I wrote a class using Curator QueueProducer who produces all the time (when the queue is 10% full, it creates new items) , items (random integer)
> - I wrote a simple class using Curator Queue Consumer which simply prints to Log "consumed item i".
> 
> I tested some different combinations :
> - running the consumers on one, two or three nodes.
> - running one or more consumers in parallel on a given node.
> 
> 
> But, and here is my question: I see some very strange behavior when I have several consummers in parallel on a node. For example, running 5 consumers per node on 3 nodes, I see a throughput **very** slow. When looking at my Log, I see that most of the consumers are most of the time on an idle state....
> 
> Do I mistake somewhere?
>  I was expecting to enhance the throughput by augmenting the number of consumers, I am surprised to see the opposite....
> 
> Thanks a lot
> 
> Benjamin