You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zookeeper.apache.org by "Joe Gamache (Created) (JIRA)" <ji...@apache.org> on 2012/03/15 22:12:39 UTC

[jira] [Created] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Just a bug in the tutorial code on the website
----------------------------------------------

                 Key: ZOOKEEPER-1418
                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
             Project: ZooKeeper
          Issue Type: Bug
          Components: documentation
    Affects Versions: 3.4.3
            Reporter: Joe Gamache
            Priority: Minor


When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
The producer created entries of the form:  /app1/element0000000001...
but the consumer tried to consume of the form:  /app1/element1...

I fixed the code, but don't see how to attach a file??  Ok, here is the code:


import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class SyncPrimitive implements Watcher {

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

    /**
     * Producer-Consumer queue
     */
    static public class Queue extends SyncPrimitive {

        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }


        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                    	StringBuilder nodeBuilder = new StringBuilder("/element");
                    	String min_string = "" + min;
                    	for (int most=10; most>min_string.length(); most--) {
                    		nodeBuilder.append("0");
                    	}
                    	nodeBuilder.append(min);
                        String element = nodeBuilder.toString();                        
                        
                        System.out.println("Temporary value: " + root + element + min);
                        byte[] b = zk.getData(root + element + min,
                                    false, stat);
                        zk.delete(root + element + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

    public static void main(String args[]) {
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);

    }

    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");

        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);

        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){

                } catch (InterruptedException e){

                }
        } else {
            System.out.println("Consumer");

            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){

                }
            }
        }
    }

    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }

        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }
        System.out.println("Left barrier");
    }
}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Joe Gamache (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joe Gamache updated ZOOKEEPER-1418:
-----------------------------------

    Attachment: SyncPrimitive.java

Updated file _should_ be attached.
                
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Joe Gamache
>            Priority: Minor
>         Attachments: SyncPrimitive.java
>
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> adding a patch with the file attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Patrick Hunt (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13230560#comment-13230560 ] 

Patrick Hunt commented on ZOOKEEPER-1418:
-----------------------------------------

Hi Joe, please see the following: (you just attach a patch file to this jira, "attach files" under "more actions")
https://cwiki.apache.org/confluence/display/ZOOKEEPER/HowToContribute
                
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Priority: Minor
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> I fixed the code, but don't see how to attach a file??  Ok, here is the code:
> import java.io.IOException;
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.nio.ByteBuffer;
> import java.util.List;
> import java.util.Random;
> import org.apache.zookeeper.CreateMode;
> import org.apache.zookeeper.KeeperException;
> import org.apache.zookeeper.WatchedEvent;
> import org.apache.zookeeper.Watcher;
> import org.apache.zookeeper.ZooKeeper;
> import org.apache.zookeeper.ZooDefs.Ids;
> import org.apache.zookeeper.data.Stat;
> public class SyncPrimitive implements Watcher {
>     static ZooKeeper zk = null;
>     static Integer mutex;
>     String root;
>     SyncPrimitive(String address) {
>         if(zk == null){
>             try {
>                 System.out.println("Starting ZK:");
>                 zk = new ZooKeeper(address, 3000, this);
>                 mutex = new Integer(-1);
>                 System.out.println("Finished starting ZK: " + zk);
>             } catch (IOException e) {
>                 System.out.println(e.toString());
>                 zk = null;
>             }
>         }
>         //else mutex = new Integer(-1);
>     }
>     synchronized public void process(WatchedEvent event) {
>         synchronized (mutex) {
>             //System.out.println("Process: " + event.getType());
>             mutex.notify();
>         }
>     }
>     /**
>      * Barrier
>      */
>     static public class Barrier extends SyncPrimitive {
>         int size;
>         String name;
>         /**
>          * Barrier constructor
>          *
>          * @param address
>          * @param root
>          * @param size
>          */
>         Barrier(String address, String root, int size) {
>             super(address);
>             this.root = root;
>             this.size = size;
>             // Create barrier node
>             if (zk != null) {
>                 try {
>                     Stat s = zk.exists(root, false);
>                     if (s == null) {
>                         zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
>                                 CreateMode.PERSISTENT);
>                     }
>                 } catch (KeeperException e) {
>                     System.out
>                             .println("Keeper exception when instantiating queue: "
>                                     + e.toString());
>                 } catch (InterruptedException e) {
>                     System.out.println("Interrupted exception");
>                 }
>             }
>             // My node name
>             try {
>                 name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
>             } catch (UnknownHostException e) {
>                 System.out.println(e.toString());
>             }
>         }
>         /**
>          * Join barrier
>          *
>          * @return
>          * @throws KeeperException
>          * @throws InterruptedException
>          */
>         boolean enter() throws KeeperException, InterruptedException{
>             zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
>                     CreateMode.EPHEMERAL_SEQUENTIAL);
>             while (true) {
>                 synchronized (mutex) {
>                     List<String> list = zk.getChildren(root, true);
>                     if (list.size() < size) {
>                         mutex.wait();
>                     } else {
>                         return true;
>                     }
>                 }
>             }
>         }
>         /**
>          * Wait until all reach barrier
>          *
>          * @return
>          * @throws KeeperException
>          * @throws InterruptedException
>          */
>         boolean leave() throws KeeperException, InterruptedException{
>             zk.delete(root + "/" + name, 0);
>             while (true) {
>                 synchronized (mutex) {
>                     List<String> list = zk.getChildren(root, true);
>                         if (list.size() > 0) {
>                             mutex.wait();
>                         } else {
>                             return true;
>                         }
>                     }
>                 }
>         }
>     }
>     /**
>      * Producer-Consumer queue
>      */
>     static public class Queue extends SyncPrimitive {
>         /**
>          * Constructor of producer-consumer queue
>          *
>          * @param address
>          * @param name
>          */
>         Queue(String address, String name) {
>             super(address);
>             this.root = name;
>             // Create ZK node name
>             if (zk != null) {
>                 try {
>                     Stat s = zk.exists(root, false);
>                     if (s == null) {
>                         zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
>                                 CreateMode.PERSISTENT);
>                     }
>                 } catch (KeeperException e) {
>                     System.out
>                             .println("Keeper exception when instantiating queue: "
>                                     + e.toString());
>                 } catch (InterruptedException e) {
>                     System.out.println("Interrupted exception");
>                 }
>             }
>         }
>         /**
>          * Add element to the queue.
>          *
>          * @param i
>          * @return
>          */
>         boolean produce(int i) throws KeeperException, InterruptedException{
>             ByteBuffer b = ByteBuffer.allocate(4);
>             byte[] value;
>             // Add child with value i
>             b.putInt(i);
>             value = b.array();
>             zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
>                         CreateMode.PERSISTENT_SEQUENTIAL);
>             return true;
>         }
>         /**
>          * Remove first element from the queue.
>          *
>          * @return
>          * @throws KeeperException
>          * @throws InterruptedException
>          */
>         int consume() throws KeeperException, InterruptedException{
>             int retvalue = -1;
>             Stat stat = null;
>             // Get the first element available
>             while (true) {
>                 synchronized (mutex) {
>                     List<String> list = zk.getChildren(root, true);
>                     if (list.size() == 0) {
>                         System.out.println("Going to wait");
>                         mutex.wait();
>                     } else {
>                         Integer min = new Integer(list.get(0).substring(7));
>                         for(String s : list){
>                             Integer tempValue = new Integer(s.substring(7));
>                             //System.out.println("Temporary value: " + tempValue);
>                             if(tempValue < min) min = tempValue;
>                         }
>                     	StringBuilder nodeBuilder = new StringBuilder("/element");
>                     	String min_string = "" + min;
>                     	for (int most=10; most>min_string.length(); most--) {
>                     		nodeBuilder.append("0");
>                     	}
>                     	nodeBuilder.append(min);
>                         String element = nodeBuilder.toString();                        
>                         
>                         System.out.println("Temporary value: " + root + element + min);
>                         byte[] b = zk.getData(root + element + min,
>                                     false, stat);
>                         zk.delete(root + element + min, 0);
>                         ByteBuffer buffer = ByteBuffer.wrap(b);
>                         retvalue = buffer.getInt();
>                         return retvalue;
>                     }
>                 }
>             }
>         }
>     }
>     public static void main(String args[]) {
>         if (args[0].equals("qTest"))
>             queueTest(args);
>         else
>             barrierTest(args);
>     }
>     public static void queueTest(String args[]) {
>         Queue q = new Queue(args[1], "/app1");
>         System.out.println("Input: " + args[1]);
>         int i;
>         Integer max = new Integer(args[2]);
>         if (args[3].equals("p")) {
>             System.out.println("Producer");
>             for (i = 0; i < max; i++)
>                 try{
>                     q.produce(10 + i);
>                 } catch (KeeperException e){
>                 } catch (InterruptedException e){
>                 }
>         } else {
>             System.out.println("Consumer");
>             for (i = 0; i < max; i++) {
>                 try{
>                     int r = q.consume();
>                     System.out.println("Item: " + r);
>                 } catch (KeeperException e){
>                     i--;
>                 } catch (InterruptedException e){
>                 }
>             }
>         }
>     }
>     public static void barrierTest(String args[]) {
>         Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
>         try{
>             boolean flag = b.enter();
>             System.out.println("Entered barrier: " + args[2]);
>             if(!flag) System.out.println("Error when entering the barrier");
>         } catch (KeeperException e){
>         } catch (InterruptedException e){
>         }
>         // Generate random integer
>         Random rand = new Random();
>         int r = rand.nextInt(100);
>         // Loop for rand iterations
>         for (int i = 0; i < r; i++) {
>             try {
>                 Thread.sleep(100);
>             } catch (InterruptedException e) {
>             }
>         }
>         try{
>             b.leave();
>         } catch (KeeperException e){
>         } catch (InterruptedException e){
>         }
>         System.out.println("Left barrier");
>     }
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Patrick Hunt (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13235334#comment-13235334 ] 

Patrick Hunt commented on ZOOKEEPER-1418:
-----------------------------------------

Hey Joe. No worries. I think what you intend is to update this file:

src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml

The easiest way to do this is 
1) svn co http://svn.apache.org/repos/asf/zookeeper/trunk
2) edit the tutorial
3) svn diff > ZOOKEEPER-1418.patch
(from within the trunk directory)
4) attach that file here

can you give that a try? If you run into trouble just lmk. Appreciate you sticking with it. Regards.
                
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Patrick Hunt
>            Priority: Minor
>         Attachments: SyncPrimitive.java
>
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> adding a patch with the file attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Hadoop QA (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13232900#comment-13232900 ] 

Hadoop QA commented on ZOOKEEPER-1418:
--------------------------------------

-1 overall.  Here are the results of testing the latest attachment 
  http://issues.apache.org/jira/secure/attachment/12518940/SyncPrimitive.java
  against trunk revision 1302281.

    +1 @author.  The patch does not contain any @author tags.

    +0 tests included.  The patch appears to be a documentation patch that doesn't require tests.

    -1 patch.  The patch command could not apply the patch.

Console output: https://builds.apache.org/job/PreCommit-ZOOKEEPER-Build/1001//console

This message is automatically generated.
                
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Joe Gamache
>            Priority: Minor
>         Attachments: SyncPrimitive.java
>
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> adding a patch with the file attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Patrick Hunt (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Patrick Hunt reassigned ZOOKEEPER-1418:
---------------------------------------

    Assignee: Joe Gamache
    
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Joe Gamache
>            Priority: Minor
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> I fixed the code, but don't see how to attach a file??  Ok, here is the code:
> import java.io.IOException;
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.nio.ByteBuffer;
> import java.util.List;
> import java.util.Random;
> import org.apache.zookeeper.CreateMode;
> import org.apache.zookeeper.KeeperException;
> import org.apache.zookeeper.WatchedEvent;
> import org.apache.zookeeper.Watcher;
> import org.apache.zookeeper.ZooKeeper;
> import org.apache.zookeeper.ZooDefs.Ids;
> import org.apache.zookeeper.data.Stat;
> public class SyncPrimitive implements Watcher {
>     static ZooKeeper zk = null;
>     static Integer mutex;
>     String root;
>     SyncPrimitive(String address) {
>         if(zk == null){
>             try {
>                 System.out.println("Starting ZK:");
>                 zk = new ZooKeeper(address, 3000, this);
>                 mutex = new Integer(-1);
>                 System.out.println("Finished starting ZK: " + zk);
>             } catch (IOException e) {
>                 System.out.println(e.toString());
>                 zk = null;
>             }
>         }
>         //else mutex = new Integer(-1);
>     }
>     synchronized public void process(WatchedEvent event) {
>         synchronized (mutex) {
>             //System.out.println("Process: " + event.getType());
>             mutex.notify();
>         }
>     }
>     /**
>      * Barrier
>      */
>     static public class Barrier extends SyncPrimitive {
>         int size;
>         String name;
>         /**
>          * Barrier constructor
>          *
>          * @param address
>          * @param root
>          * @param size
>          */
>         Barrier(String address, String root, int size) {
>             super(address);
>             this.root = root;
>             this.size = size;
>             // Create barrier node
>             if (zk != null) {
>                 try {
>                     Stat s = zk.exists(root, false);
>                     if (s == null) {
>                         zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
>                                 CreateMode.PERSISTENT);
>                     }
>                 } catch (KeeperException e) {
>                     System.out
>                             .println("Keeper exception when instantiating queue: "
>                                     + e.toString());
>                 } catch (InterruptedException e) {
>                     System.out.println("Interrupted exception");
>                 }
>             }
>             // My node name
>             try {
>                 name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
>             } catch (UnknownHostException e) {
>                 System.out.println(e.toString());
>             }
>         }
>         /**
>          * Join barrier
>          *
>          * @return
>          * @throws KeeperException
>          * @throws InterruptedException
>          */
>         boolean enter() throws KeeperException, InterruptedException{
>             zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
>                     CreateMode.EPHEMERAL_SEQUENTIAL);
>             while (true) {
>                 synchronized (mutex) {
>                     List<String> list = zk.getChildren(root, true);
>                     if (list.size() < size) {
>                         mutex.wait();
>                     } else {
>                         return true;
>                     }
>                 }
>             }
>         }
>         /**
>          * Wait until all reach barrier
>          *
>          * @return
>          * @throws KeeperException
>          * @throws InterruptedException
>          */
>         boolean leave() throws KeeperException, InterruptedException{
>             zk.delete(root + "/" + name, 0);
>             while (true) {
>                 synchronized (mutex) {
>                     List<String> list = zk.getChildren(root, true);
>                         if (list.size() > 0) {
>                             mutex.wait();
>                         } else {
>                             return true;
>                         }
>                     }
>                 }
>         }
>     }
>     /**
>      * Producer-Consumer queue
>      */
>     static public class Queue extends SyncPrimitive {
>         /**
>          * Constructor of producer-consumer queue
>          *
>          * @param address
>          * @param name
>          */
>         Queue(String address, String name) {
>             super(address);
>             this.root = name;
>             // Create ZK node name
>             if (zk != null) {
>                 try {
>                     Stat s = zk.exists(root, false);
>                     if (s == null) {
>                         zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
>                                 CreateMode.PERSISTENT);
>                     }
>                 } catch (KeeperException e) {
>                     System.out
>                             .println("Keeper exception when instantiating queue: "
>                                     + e.toString());
>                 } catch (InterruptedException e) {
>                     System.out.println("Interrupted exception");
>                 }
>             }
>         }
>         /**
>          * Add element to the queue.
>          *
>          * @param i
>          * @return
>          */
>         boolean produce(int i) throws KeeperException, InterruptedException{
>             ByteBuffer b = ByteBuffer.allocate(4);
>             byte[] value;
>             // Add child with value i
>             b.putInt(i);
>             value = b.array();
>             zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
>                         CreateMode.PERSISTENT_SEQUENTIAL);
>             return true;
>         }
>         /**
>          * Remove first element from the queue.
>          *
>          * @return
>          * @throws KeeperException
>          * @throws InterruptedException
>          */
>         int consume() throws KeeperException, InterruptedException{
>             int retvalue = -1;
>             Stat stat = null;
>             // Get the first element available
>             while (true) {
>                 synchronized (mutex) {
>                     List<String> list = zk.getChildren(root, true);
>                     if (list.size() == 0) {
>                         System.out.println("Going to wait");
>                         mutex.wait();
>                     } else {
>                         Integer min = new Integer(list.get(0).substring(7));
>                         for(String s : list){
>                             Integer tempValue = new Integer(s.substring(7));
>                             //System.out.println("Temporary value: " + tempValue);
>                             if(tempValue < min) min = tempValue;
>                         }
>                     	StringBuilder nodeBuilder = new StringBuilder("/element");
>                     	String min_string = "" + min;
>                     	for (int most=10; most>min_string.length(); most--) {
>                     		nodeBuilder.append("0");
>                     	}
>                     	nodeBuilder.append(min);
>                         String element = nodeBuilder.toString();                        
>                         
>                         System.out.println("Temporary value: " + root + element + min);
>                         byte[] b = zk.getData(root + element + min,
>                                     false, stat);
>                         zk.delete(root + element + min, 0);
>                         ByteBuffer buffer = ByteBuffer.wrap(b);
>                         retvalue = buffer.getInt();
>                         return retvalue;
>                     }
>                 }
>             }
>         }
>     }
>     public static void main(String args[]) {
>         if (args[0].equals("qTest"))
>             queueTest(args);
>         else
>             barrierTest(args);
>     }
>     public static void queueTest(String args[]) {
>         Queue q = new Queue(args[1], "/app1");
>         System.out.println("Input: " + args[1]);
>         int i;
>         Integer max = new Integer(args[2]);
>         if (args[3].equals("p")) {
>             System.out.println("Producer");
>             for (i = 0; i < max; i++)
>                 try{
>                     q.produce(10 + i);
>                 } catch (KeeperException e){
>                 } catch (InterruptedException e){
>                 }
>         } else {
>             System.out.println("Consumer");
>             for (i = 0; i < max; i++) {
>                 try{
>                     int r = q.consume();
>                     System.out.println("Item: " + r);
>                 } catch (KeeperException e){
>                     i--;
>                 } catch (InterruptedException e){
>                 }
>             }
>         }
>     }
>     public static void barrierTest(String args[]) {
>         Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
>         try{
>             boolean flag = b.enter();
>             System.out.println("Entered barrier: " + args[2]);
>             if(!flag) System.out.println("Error when entering the barrier");
>         } catch (KeeperException e){
>         } catch (InterruptedException e){
>         }
>         // Generate random integer
>         Random rand = new Random();
>         int r = rand.nextInt(100);
>         // Loop for rand iterations
>         for (int i = 0; i < r; i++) {
>             try {
>                 Thread.sleep(100);
>             } catch (InterruptedException e) {
>             }
>         }
>         try{
>             b.leave();
>         } catch (KeeperException e){
>         } catch (InterruptedException e){
>         }
>         System.out.println("Left barrier");
>     }
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Joe Gamache (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joe Gamache updated ZOOKEEPER-1418:
-----------------------------------

    Description: 
When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
The producer created entries of the form:  /app1/element0000000001...
but the consumer tried to consume of the form:  /app1/element1...

adding a patch with the file attached.

  was:
When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
The producer created entries of the form:  /app1/element0000000001...
but the consumer tried to consume of the form:  /app1/element1...

I fixed the code, but don't see how to attach a file??  Ok, here is the code:


import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class SyncPrimitive implements Watcher {

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

    /**
     * Producer-Consumer queue
     */
    static public class Queue extends SyncPrimitive {

        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }


        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                    	StringBuilder nodeBuilder = new StringBuilder("/element");
                    	String min_string = "" + min;
                    	for (int most=10; most>min_string.length(); most--) {
                    		nodeBuilder.append("0");
                    	}
                    	nodeBuilder.append(min);
                        String element = nodeBuilder.toString();                        
                        
                        System.out.println("Temporary value: " + root + element + min);
                        byte[] b = zk.getData(root + element + min,
                                    false, stat);
                        zk.delete(root + element + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

    public static void main(String args[]) {
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);

    }

    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");

        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);

        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){

                } catch (InterruptedException e){

                }
        } else {
            System.out.println("Consumer");

            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){

                }
            }
        }
    }

    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }

        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }
        System.out.println("Left barrier");
    }
}

    
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Joe Gamache
>            Priority: Minor
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> adding a patch with the file attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Joe Gamache (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joe Gamache reassigned ZOOKEEPER-1418:
--------------------------------------

    Assignee: Patrick Hunt  (was: Joe Gamache)

Hi - sorry to assign this back to you.  I added the updated file as a patch.  However, the system wasn't all that pleased :).   I'm not 100% sure this code even exists in the code base anywhere, so there isn't something to "patch" in a traditional sense.  Rather, the website itself that contains this code needs to be updated....
                
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Patrick Hunt
>            Priority: Minor
>         Attachments: SyncPrimitive.java
>
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> adding a patch with the file attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (ZOOKEEPER-1418) Just a bug in the tutorial code on the website

Posted by "Patrick Hunt (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/ZOOKEEPER-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Patrick Hunt reassigned ZOOKEEPER-1418:
---------------------------------------

    Assignee: Joe Gamache  (was: Patrick Hunt)
    
> Just a bug in the tutorial code on the website
> ----------------------------------------------
>
>                 Key: ZOOKEEPER-1418
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-1418
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: documentation
>    Affects Versions: 3.4.3
>            Reporter: Joe Gamache
>            Assignee: Joe Gamache
>            Priority: Minor
>         Attachments: SyncPrimitive.java
>
>
> When I ran the Queue example from here: http://zookeeper.apache.org/doc/trunk/zookeeperTutorial.html
> The producer created entries of the form:  /app1/element0000000001...
> but the consumer tried to consume of the form:  /app1/element1...
> adding a patch with the file attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira