You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2008/10/10 15:58:04 UTC

svn commit: r703480 [3/3] - in /hadoop/zookeeper/trunk: ./ docs/ docs/images/ docs/skin/ src/docs/src/documentation/ src/docs/src/documentation/content/xdocs/ src/docs/src/documentation/resources/images/

Added: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml?rev=703480&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml (added)
+++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml Fri Oct 10 06:58:03 2008
@@ -0,0 +1,674 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright 2002-2004 The Apache Software Foundation
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE article PUBLIC "-//OASIS//DTD Simplified DocBook XML V1.0//EN"
+"http://www.oasis-open.org/docbook/xml/simple/1.0/sdocbook.dtd">
+<article id="ar_Tutorial">
+  <title>Programming with ZooKeeper - A basic tutorial</title>
+
+  <articleinfo>
+    <legalnotice>
+      <para>Licensed under the Apache License, Version 2.0 (the "License");
+      you may not use this file except in compliance with the License. You may
+      obtain a copy of the License at <ulink
+      url="http://www.apache.org/licenses/LICENSE-2.0">http://www.apache.org/licenses/LICENSE-2.0</ulink>.</para>
+
+      <para>Unless required by applicable law or agreed to in writing,
+      software distributed under the License is distributed on an "AS IS"
+      BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied. See the License for the specific language governing permissions
+      and limitations under the License.</para>
+    </legalnotice>
+
+    <abstract>
+      <para>This article contains sample Java code for simple implementations of barrier
+      and consumers queues..</para>
+
+    </abstract>
+  </articleinfo>
+
+  <section id="ch_Introduction">
+    <title>Introduction</title>
+
+    <para>In this tutorial, we show simple implementations of barriers and 
+    producer-consumer queues using ZooKeeper. We call the respective classes Barrier and Queue. 
+    These examples assume that you have at least one ZooKeeper server running.</para>
+    
+    <para>Both primitives use the following common excerpt of code:</para>
+    
+    <programlisting>
+static ZooKeeper zk = null;
+static Integer mutex;
+
+String root;
+
+SyncPrimitive(String address) {
+	if(zk == null){
+	    try {
+		System.out.println("Starting ZK:");
+		zk = new ZooKeeper(address, 3000, this);
+		mutex = new Integer(-1);
+		System.out.println("Finished starting ZK: " + zk);
+	    } catch (KeeperException e) {
+		System.out.println("Keeper exception when starting new session: "
+			+ e.toString());
+		zk = null;
+	    } catch (IOException e) {
+		System.out.println(e.toString());
+		zk = null;
+	    }
+	}
+}
+
+synchronized public void process(WatcherEvent event) {
+	synchronized (mutex) {
+	    mutex.notify();
+	}
+}
+
+</programlisting>
+
+<para>Both classes extend SyncPrimitive. In this way, we execute steps that are 
+common to all primitives in the constructor of SyncPrimitive. To keep the examples 
+simple, we create a ZooKeeper object the first time we instantiate either a barrier 
+object or a queue object, and we declare a static variable that is a reference 
+to this object. The subsequent instances of Barrier and Queue check whether a 
+ZooKeeper object exists. Alternatively, we could have the application creating a
+ZooKeeper object and passing it to the constructor of Barrier and Queue.</para>
+<para>
+We use the process() method to process notifications triggered due to watches. 
+In the following discussion, we present code that sets watches. A watch is internal 
+structure that enables ZooKeeper to notify a client of a change to a node. For example, 
+if a client is waiting for other clients to leave a barrier, then it can set a watch and 
+wait for modifications to a particular node, which can indicate that it is the end of the wait. 
+This point becomes clear once we go over the examples.
+</para>
+</section>
+   
+ <section id="sc_barriers"><title>Barriers</title>
+ 
+ <para>
+ A barrier is a primitive that enables a group of processes to synchronize the 
+ beginning and the end of a computation. The general idea of this implementation 
+ is to have a barrier node that serves the purpose of being a parent for individual 
+ process nodes. Suppose that we call the barrier node "/b1". Each process "p" then 
+ creates a node "/b1/p". Once enough processes have created their corresponding 
+ nodes, joined processes can start the computation.
+ </para>
+ 
+ <para>In this example, each process instantiates a Barrier object, and its constructor takes as parameters:</para>
+
+ <itemizedlist><listitem><para>the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")</para></listitem>
+<listitem><para>the path of the barrier node on ZooKeeper (e.g., "/b1")</para></listitem>
+<listitem><para>the size of the group of processes</para></listitem>
+</itemizedlist>
+
+<para>The constructor of Barrier passes the address of the Zookeeper server to the 
+constructor of the parent class. The parent class creates a ZooKeeper instance if 
+one does not exist. The constructor of Barrier then creates a 
+barrier node on ZooKeeper, which is the parent node of all process nodes, and 
+we call root (<emphasis role="bold">Note:</emphasis> This is not the ZooKeeper root "/").</para>
+
+<programlisting>
+ /**
+ * Barrier constructor
+ *
+ * @param address
+ * @param name
+ * @param size
+ */
+Barrier(String address, String name, int size) {
+    super(address);
+    this.root = name;
+    this.size = size;
+
+    // Create barrier node
+    if (zk != null) {
+	try {
+	    Stat s = zk.exists(root, false);
+	    if (s == null) {
+		zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+	    }
+	} catch (KeeperException e) {
+	    System.out.println("Keeper exception when instantiating queue: " + e.toString());
+	} catch (InterruptedException e) {
+	    System.out.println("Interrupted exception");
+	}
+    }
+
+    // My node name
+    try {
+	name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
+    } catch (UnknownHostException e) {
+	System.out.println(e.toString());
+    }
+
+}
+</programlisting>
+<para>
+To enter the barrier, a process calls enter(). The process creates a node under 
+the root to represent it, using its host name to form the node name. It then wait 
+until enough processes have entered the barrier. A process does it by checking 
+the number of children the root node has with "getChildren()", and waiting for 
+notifications in the case it does not have enough. To receive a notification when 
+there is a change to the root node, a process has to set a watch, and does it 
+through the call to "getChildren()". In the code, we have that "getChildren()" 
+has two parameters. The first one states the node to read from, and the second is
+a boolean flag that enables the process to set a watch. In the code the flag is true.
+</para>
+
+<programlisting>
+ /**
+ * Join barrier
+ *
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+
+boolean enter() throws KeeperException, InterruptedException{
+    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
+	    CreateFlags.EPHEMERAL);
+    while (true) {
+	synchronized (mutex) {
+	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+
+	    if (list.size() &lt; size) {
+		mutex.wait();
+	    } else {
+		return true;
+	    }
+	}
+    }
+}
+</programlisting>
+<para>
+Note that enter() throws both KeeperException and InterruptedException, so it is 
+the reponsability of the application to catch and handle such exceptions.</para>
+
+<para>
+Once the computation is finished, a process calls leave() to leave the barrier. 
+First it deletes its corresponding node, and then it gets the children of the root 
+node. If there is at least one child, then it waits for a notification (obs: note 
+that the second parameter of the call to getChildren() is true, meaning that 
+ZooKeeper has to set a watch on the the root node). Upon reception of a notification, 
+it checks once more whether the root node has any child.</para>
+
+<programlisting>
+ /**
+ * Wait until all reach barrier
+ *
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+
+boolean leave() throws KeeperException, InterruptedException{
+    zk.delete(root + "/" + name, 0);
+    while (true) {
+	synchronized (mutex) {
+	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+		if (list.size() &gt; 0) {
+		    mutex.wait();
+		} else {
+		    return true;
+		}
+	    }
+	}
+}
+</programlisting>
+</section>
+<section id="sc_producerConsumerQueues"><title>Producer-Consumer Queues</title>
+<para>
+A producer-consumer queue is a distributed data estructure thata group of processes 
+use to generate and consume items. Producer processes create new elements and add 
+them to the queue. Consumer processes remove elements from the list, and process them. 
+In this implementation, the elements are simple integers. The queue is represented 
+by a root node, and to add an element to the queue, a producer process creates a new node, 
+a child of the root node.
+</para>
+
+<para>
+The following excerpt of code corresponds to the constructor of the object. As 
+with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, 
+that creates a ZooKeeper object if one doesn't exist. It then verifies if the root 
+node of the queue exists, and creates if it doesn't.
+</para>
+<programlisting>
+/**
+ * Constructor of producer-consumer queue
+ *
+ * @param address
+ * @param name
+ */
+Queue(String address, String name) {
+    super(address);
+    this.root = name;
+    // Create ZK node name
+    if (zk != null) {
+	try {
+	    Stat s = zk.exists(root, false);
+	    if (s == null) {
+		zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+	    }
+	} catch (KeeperException e) {
+	    System.out
+		    .println("Keeper exception when instantiating queue: "
+			    + e.toString());
+	} catch (InterruptedException e) {
+	    System.out.println("Interrupted exception");
+	}
+    }
+}
+ </programlisting>
+ 
+<para>
+A producer process calls "produce()" to add an element to the queue, and passes 
+an integer as an argument. To add an element to the queue, the method creates a 
+new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to 
+append the value of the sequencer counter associated to the root node. In this way, 
+we impose a total order on the elements of the queue, thus guaranteeing that the 
+oldest element of the queue is the next one consumed.
+</para>
+
+<programlisting>
+/**
+ * Add element to the queue.
+ *
+ * @param i
+ * @return
+ */
+
+boolean produce(int i) throws KeeperException, InterruptedException{
+    ByteBuffer b = ByteBuffer.allocate(4);
+    byte[] value;
+
+    // Add child with value i
+    b.putInt(i);
+    value = b.array();
+    zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
+		CreateFlags.SEQUENCE);
+
+    return true;
+}
+</programlisting>
+<para>
+To consume an element, a consumer process obtains the children of the root node, 
+reads the node with smallest counter value, and returns the element. Note that 
+if there is a conflict, then one of the two contending processes won't be able to 
+delete the node and the delete operation will throw an exception.</para>
+
+<para>
+A call to getChildren() returns the list of children in lexicographic order. 
+As lexicographic order does not necessary follow the numerical order of the counter 
+values, we need to decide which element is the smallest. To decide which one has 
+the smallest counter value, we traverse the list, and remove the prefix "element" 
+from each one.</para>
+
+<programlisting>
+/**
+ * Remove first element from the queue.
+ *
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+int consume() throws KeeperException, InterruptedException{
+    int retvalue = -1;
+    Stat stat = null;
+
+    // Get the first element available
+    while (true) {
+	synchronized (mutex) {
+	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+	    if (list.size() == 0) {
+		System.out.println("Going to wait");
+		mutex.wait();
+	    } else {
+		Integer min = new Integer(list.get(0).substring(7));
+		for(String s : list){
+		    Integer tempValue = new Integer(s.substring(7));
+		    if(tempValue &gt; min) min = tempValue;
+		}
+		System.out.println("Temporary value: " + root + "/element" + min);
+		byte[] b = zk.getData(root + "/element" + min, false, stat);
+		zk.delete(root + "/element" + min, 0);
+		ByteBuffer buffer = ByteBuffer.wrap(b);
+		retvalue = buffer.getInt();
+
+		return retvalue;
+	    }
+	}
+    }
+}
+</programlisting>
+ 
+</section>
+<section id="sc_sourceListing"><title>Complete Source Listing</title>
+<example id="eg_SyncPrimitive_java">
+<title>SyncPrimitive.Java</title>
+<programlisting>
+package com.yahoo.SyncPrimitive;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.lang.InterruptedException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import com.yahoo.zookeeper.Watcher;
+import com.yahoo.zookeeper.data.Stat;
+import com.yahoo.zookeeper.proto.WatcherEvent;
+import com.yahoo.zookeeper.KeeperException;
+import com.yahoo.zookeeper.ZooDefs.Ids;
+import com.yahoo.zookeeper.ZooDefs.CreateFlags;
+import com.yahoo.zookeeper.ZooKeeper;
+
+public class SyncPrimitive implements Watcher {
+
+    static ZooKeeper zk = null;
+    static Integer mutex;
+    
+    String root;
+
+    SyncPrimitive(String address) {
+        if(zk == null){
+            try {
+                System.out.println("Starting ZK:");
+                zk = new ZooKeeper(address, 3000, this);
+                mutex = new Integer(-1);
+                System.out.println("Finished starting ZK: " + zk);
+            } catch (KeeperException e) {
+                System.out.println("Keeper exception when starting new session: "
+                        + e.toString());
+                zk = null;
+            } catch (IOException e) {
+                System.out.println(e.toString());
+                zk = null;
+            }
+        }
+        //else mutex = new Integer(-1);
+    }
+
+    synchronized public void process(WatcherEvent event) {
+        synchronized (mutex) {
+            //System.out.println("Process: " + event.getType());
+            mutex.notify();
+        }
+    }
+
+    /**
+     * Barrier
+     */
+    static public class Barrier extends SyncPrimitive {
+        int size;
+        String name;
+
+        /**
+         * Barrier constructor
+         * 
+         * @param address
+         * @param name
+         * @param size
+         */
+        Barrier(String address, String name, int size) {
+            super(address);
+            this.root = name;
+            this.size = size;
+
+            // Create barrier node
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+            
+            // My node name
+            try {
+                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
+            } catch (UnknownHostException e) {
+                System.out.println(e.toString());
+            }
+            
+        }
+
+        /**
+         * Join barrier
+         * 
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        
+        boolean enter() throws KeeperException, InterruptedException{
+            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateFlags.EPHEMERAL);
+            while (true) {
+                synchronized (mutex) {                    
+                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+       
+                    if (list.size() &lt; size) {
+                        mutex.wait();
+                    } else {
+                        return true;
+                    }
+                }
+            }
+        }
+
+        /**
+         * Wait until all reach barrier
+         * 
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        
+        boolean leave() throws KeeperException, InterruptedException{
+            zk.delete(root + "/" + name, 0);
+            while (true) {
+                synchronized (mutex) {
+                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+                        if (list.size() > 0) {
+                            mutex.wait();
+                        } else {
+                            return true;
+                        }
+                    }
+                }
+        }
+    }
+
+    /**
+     * Producer-Consumer queue
+     */
+    static public class Queue extends SyncPrimitive {
+
+        /**
+         * Constructor of producer-consumer queue
+         * 
+         * @param address
+         * @param name
+         */
+        Queue(String address, String name) {
+            super(address);
+            this.root = name;
+            // Create ZK node name
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+        }
+
+        /**
+         * Add element to the queue.
+         * 
+         * @param i
+         * @return
+         */
+        
+        boolean produce(int i) throws KeeperException, InterruptedException{
+            ByteBuffer b = ByteBuffer.allocate(4);
+            byte[] value;
+
+            // Add child with value i
+            b.putInt(i);
+            value = b.array();
+            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
+                        CreateFlags.SEQUENCE);
+            
+            return true;
+        }
+
+
+        /**
+         * Remove first element from the queue.
+         * 
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        int consume() throws KeeperException, InterruptedException{
+            int retvalue = -1;
+            Stat stat = null;
+            
+            // Get the first element available
+            while (true) {
+                synchronized (mutex) {
+                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+                    if (list.size() == 0) {
+                        System.out.println("Going to wait");
+                        mutex.wait();
+                    } else {
+                        Integer min = new Integer(list.get(0).substring(7));
+                        for(String s : list){
+                            Integer tempValue = new Integer(s.substring(7));
+                            //System.out.println("Temporary value: " + tempValue);
+                            if(tempValue &lt; min) min = tempValue;
+                        }
+                        System.out.println("Temporary value: " + root + "/element" + min);
+                        byte[] b = zk.getData(root + "/element" + min,
+                                    false, stat);
+                        zk.delete(root + "/element" + min, 0);
+                        ByteBuffer buffer = ByteBuffer.wrap(b);
+                        retvalue = buffer.getInt();
+                        
+                        return retvalue;
+                    }
+                }
+            }           
+        }
+    }
+
+    public static void main(String args[]) {
+        if (args[0].equals("qTest"))
+            queueTest(args);
+        else
+            barrierTest(args);
+
+    }
+
+    public static void queueTest(String args[]) {
+        Queue q = new Queue(args[1], "/app1");
+
+        System.out.println("Input: " + args[1]);
+        int i;
+        Integer max = new Integer(args[2]);
+
+        if (args[3].equals("p")) {
+            System.out.println("Producer");
+            for (i = 0; i &lt; max; i++)
+                try{    
+                    q.produce(10 + i);
+                } catch (KeeperException e){
+                    
+                } catch (InterruptedException e){
+                    
+                }
+        } else {
+            System.out.println("Consumer");
+   
+            for (i = 0; i &lt; max; i++) {
+                try{
+                    int r = q.consume();
+                    System.out.println("Item: " + r);
+                } catch (KeeperException e){
+                    i--;
+                } catch (InterruptedException e){
+                    
+                }
+            }
+        }
+    }
+
+    public static void barrierTest(String args[]) {
+        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
+        try{
+            boolean flag = b.enter();
+            System.out.println("Entered barrier: " + args[2]);
+            if(!flag) System.out.println("Error when entering the barrier");
+        } catch (KeeperException e){
+            
+        } catch (InterruptedException e){
+            
+        }
+
+        // Generate random integer
+        Random rand = new Random();
+        int r = rand.nextInt(100);
+        // Loop for rand iterations
+        for (int i = 0; i &lt; r; i++) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+
+            }
+        }
+        try{
+            b.leave();
+        } catch (KeeperException e){
+            
+        } catch (InterruptedException e){
+            
+        }
+        System.out.println("Left barrier");
+    }
+}</programlisting></example>
+</section>
+
+</article>

Propchange: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/zookeeper/trunk/src/docs/src/documentation/resources/images/2pc.png
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/resources/images/2pc.png?rev=703480&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/zookeeper/trunk/src/docs/src/documentation/resources/images/2pc.png
------------------------------------------------------------------------------
    svn:mime-type = image/png

Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml?rev=703480&r1=703479&r2=703480&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml (original)
+++ hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml Fri Oct 10 06:58:03 2008
@@ -142,10 +142,20 @@
       font-family: monospace;
     }
 
+    pre.code {
+      margin-left: 0em;
+      padding: 0.5em;
+      background-color: #f0f0f0;
+      font-family: monospace;
+    }
+
+<!-- patricks
     .code {
       font-family: "Courier New", Courier, monospace;
       font-size: 110%;
     }
+-->
+
   </extra-css>
 
   <colors>