You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2008/06/24 21:04:59 UTC

svn commit: r671303 [6/6] - in /hadoop/zookeeper/trunk/src/java: lib/cobertura/lib/ lib/svnant/ main/org/apache/zookeeper/ main/org/apache/zookeeper/server/ main/org/apache/zookeeper/server/auth/ main/org/apache/zookeeper/server/quorum/ main/org/apache...

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jun 24 12:04:58 2008
@@ -1,573 +1,573 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-
-import static org.apache.zookeeper.server.ServerConfig.getClientPort;
-import static org.apache.zookeeper.server.ServerConfig.getDataDir;
-import static org.apache.zookeeper.server.ServerConfig.getDataLogDir;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getSyncLimit;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getTickTime;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.txn.TxnHeader;
-
-/**
- * This class manages the quorum protocol. There are three states this server
- * can be in:
- * <ol>
- * <li>Leader election - each server will elect a leader (proposing itself as a
- * leader initially).</li>
- * <li>Follower - the server will synchronize with the leader and replicate any
- * transactions.</li>
- * <li>Leader - the server will process requests and forward them to followers.
- * A majority of followers must log the request before it can be accepted.
- * </ol>
- *
- * This class will setup a datagram socket that will always respond with its
- * view of the current leader. The response will take the form of:
- *
- * <pre>
- * int xid;
- *
- * long myid;
- *
- * long leader_id;
- *
- * long leader_zxid;
- * </pre>
- *
- * The request for the current leader will consist solely of an xid: int xid;
- *
- * <h2>Configuration file</h2>
- *
- * When the main() method of this class is used to start the program, the file
- * "zoo.cfg" in the current directory will be used to obtain configuration
- * information. zoo.cfg is a Properties file, so keys and values are separated
- * by equals (=) and the key/value pairs are separated by new lines. The
- * following keys are used in the configuration file:
- * <ol>
- * <li>dataDir - The directory where the zookeeper data is stored.</li>
- * <li>clientPort - The port used to communicate with clients.</li>
- * <li>tickTime - The duration of a tick in milliseconds. This is the basic
- * unit of time in zookeeper.</li>
- * <li>initLimit - The maximum number of ticks that a follower will wait to
- * initially synchronize with a leader.</li>
- * <li>syncLimit - The maximum number of ticks that a follower will wait for a
- * message (including heartbeats) from the leader.</li>
- * <li>server.<i>id</i> - This is the host:port that the server with the
- * given id will use for the quorum protocol.</li>
- * </ol>
- * In addition to the zoo.cfg file. There is a file in the data directory called
- * "myid" that contains the server id as an ASCII decimal value.
- */
-public class QuorumPeer extends Thread implements QuorumStats.Provider {
-    private static final Logger LOG = Logger.getLogger(QuorumPeer.class);
-
-    /**
-     * Create an instance of a quorum peer
-     */
-    public interface Factory{
-        public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException;
-        public NIOServerCnxn.Factory createConnectionFactory() throws IOException;
-    }
-
-    public static class QuorumServer {
-        public QuorumServer(long id, InetSocketAddress addr) {
-            this.id = id;
-            this.addr = addr;
-        }
-
-        public InetSocketAddress addr;
-
-        public long id;
-    }
-
-    public enum ServerState {
-        LOOKING, FOLLOWING, LEADING;
-    }
-    /**
-     * The servers that make up the cluster
-     */
-    ArrayList<QuorumServer> quorumPeers;
-    public int getQuorumSize(){
-        return quorumPeers.size();
-    }
-    /**
-     * My id
-     */
-    private long myid;
-
-
-    /**
-     * get the id of this quorum peer.
-     */
-    public long getId() {
-        return myid;
-    }
-
-    /**
-     * This is who I think the leader currently is.
-     */
-    volatile Vote currentVote;
-
-    volatile boolean running = true;
-
-    /**
-     * The number of milliseconds of each tick
-     */
-    int tickTime;
-
-    /**
-     * The number of ticks that the initial synchronization phase can take
-     */
-    int initLimit;
-
-    /**
-     * The number of ticks that can pass between sending a request and getting
-     * an acknowledgement
-     */
-    int syncLimit;
-
-    /**
-     * The current tick
-     */
-    int tick;
-
-    /**
-     * This class simply responds to requests for the current leader of this
-     * node.
-     * <p>
-     * The request contains just an xid generated by the requestor.
-     * <p>
-     * The response has the xid, the id of this server, the id of the leader,
-     * and the zxid of the leader.
-     *
-     * @author breed
-     *
-     */
-    class ResponderThread extends Thread {
-        ResponderThread() {
-            super("ResponderThread");
-        }
-
-        public void run() {
-            try {
-                byte b[] = new byte[36];
-                ByteBuffer responseBuffer = ByteBuffer.wrap(b);
-                DatagramPacket packet = new DatagramPacket(b, b.length);
-                while (true) {
-                    udpSocket.receive(packet);
-                    if (packet.getLength() != 4) {
-                        LOG.warn("Got more than just an xid! Len = "
-                                + packet.getLength());
-                    } else {
-                        responseBuffer.clear();
-                        responseBuffer.getInt(); // Skip the xid
-                        responseBuffer.putLong(myid);
-                        switch (state) {
-                        case LOOKING:
-                            responseBuffer.putLong(currentVote.id);
-                            responseBuffer.putLong(currentVote.zxid);
-                            break;
-                        case LEADING:
-                            responseBuffer.putLong(myid);
-                            try {
-                                responseBuffer.putLong(leader.lastProposed);
-                            } catch (NullPointerException npe) {
-                                // This can happen in state transitions,
-                                // just ignore the request
-                            }
-                            break;
-                        case FOLLOWING:
-                            responseBuffer.putLong(currentVote.id);
-                            try {
-                                responseBuffer.putLong(follower.getZxid());
-                            } catch (NullPointerException npe) {
-                                // This can happen in state transitions,
-                                // just ignore the request
-                            }
-                        }
-                        packet.setData(b);
-                        udpSocket.send(packet);
-                    }
-                    packet.setLength(b.length);
-                }
-            } catch (Exception e) {
-                LOG.warn("Unexpected exception",e);
-            } finally {
-                LOG.warn("QuorumPeer responder thread exited");
-            }
-        }
-    }
-
-    private ServerState state = ServerState.LOOKING;
-
-    public void setPeerState(ServerState newState){
-        state=newState;
-    }
-
-    public ServerState getPeerState(){
-        return state;
-    }
-
-    DatagramSocket udpSocket;
-
-    private InetSocketAddress myQuorumAddr;
-
-    public InetSocketAddress getQuorumAddress(){
-        return myQuorumAddr;
-    }
-
-    /**
-     * the directory where the snapshot is stored.
-     */
-    private File dataDir;
-
-    /**
-     * the directory where the logs are stored.
-     */
-    private File dataLogDir;
-
-    Election electionAlg;
-
-    int electionPort;
-
-    NIOServerCnxn.Factory cnxnFactory;
-
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
-            File dataLogDir, int electionAlg, int electionPort,long myid, int tickTime,
-            int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super("QuorumPeer");
-        this.cnxnFactory = cnxnFactory;
-        this.quorumPeers = quorumPeers;
-        this.dataDir = dataDir;
-        this.electionPort = electionPort;
-        this.dataLogDir = dataLogDir;
-        this.myid = myid;
-        this.tickTime = tickTime;
-        this.initLimit = initLimit;
-        this.syncLimit = syncLimit;
-        currentVote = new Vote(myid, getLastLoggedZxid());
-        for (QuorumServer p : quorumPeers) {
-            if (p.id == myid) {
-                myQuorumAddr = p.addr;
-                break;
-            }
-        }
-        if (myQuorumAddr == null) {
-            throw new SocketException("My id " + myid + " not in the peer list");
-        }
-        if (electionAlg == 0) {
-            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
-            new ResponderThread().start();
-        }
-        this.electionAlg = createElectionAlgorithm(electionAlg);
-        QuorumStats.getInstance().setStatsProvider(this);
-    }
-
-    /**
-     * This constructor is only used by the existing unit test code.
-     */
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
-            File dataLogDir, int clientPort, int electionAlg, int electionPort,
-            long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
-        this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime,
-                initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
-    }
-    /**
-     *  The constructor uses the quorum peer config to instantiate the class
-     */
-    public QuorumPeer(NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        this(getServers(), new File(getDataDir()), new File(getDataLogDir()),
-                getElectionAlg(), getElectionPort(),getServerId(),getTickTime(),
-                getInitLimit(), getSyncLimit(),cnxnFactory);
-    }
-
-    public Follower follower;
-    public Leader leader;
-
-    protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
-        return new Follower(this, new FollowerZooKeeperServer(dataDir,
-                dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder()));
-    }
-
-    protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
-        return new Leader(this, new LeaderZooKeeperServer(dataDir, dataLogDir,
-                this,new ZooKeeperServer.BasicDataTreeBuilder()));
-    }
-
-    private Election createElectionAlgorithm(int electionAlgorithm){
-        Election le=null;
-        //TODO: use a factory rather than a switch
-        switch (electionAlgorithm) {
-        case 0:
-            // will create a new instance for each run of the protocol
-            break;
-        case 1:
-            le = new AuthFastLeaderElection(this, this.electionPort);
-            break;
-        case 2:
-            le = new AuthFastLeaderElection(this, this.electionPort, true);
-            break;
-        case 3:
-            le = new FastLeaderElection(this,
-                        new QuorumCnxManager(this.electionPort));
-        default:
-            assert false;
-        }
-        return le;
-    }
-
-    protected Election makeLEStrategy(){
-        if(electionAlg==null)
-            return new LeaderElection(this);
-        return electionAlg;
-    }
-
-    synchronized protected void setLeader(Leader newLeader){
-        leader=newLeader;
-    }
-
-    synchronized protected void setFollower(Follower newFollower){
-        follower=newFollower;
-    }
-
-    synchronized public ZooKeeperServer getActiveServer(){
-        if(leader!=null)
-            return leader.zk;
-        else if(follower!=null)
-            return follower.zk;
-        return null;
-    }
-
-    public void run() {
-        /*
-         * Main loop
-         */
-        while (running) {
-            switch (state) {
-            case LOOKING:
-                try {
-                    LOG.info("LOOKING");
-                    currentVote = makeLEStrategy().lookForLeader();
-                } catch (Exception e) {
-                    LOG.warn("Unexpected exception",e);
-                    state = ServerState.LOOKING;
-                }
-                break;
-            case FOLLOWING:
-                try {
-                    LOG.info("FOLLOWING");
-                    setFollower(makeFollower(dataDir,dataLogDir));
-                    follower.followLeader();
-                } catch (Exception e) {
-                    LOG.warn("Unexpected exception",e);
-                } finally {
-                    follower.shutdown();
-                    setFollower(null);
-                    state = ServerState.LOOKING;
-                }
-                break;
-            case LEADING:
-                LOG.info("LEADING");
-                try {
-                    setLeader(makeLeader(dataDir,dataLogDir));
-                    leader.lead();
-                    setLeader(null);
-                } catch (Exception e) {
-                    LOG.warn("Unexpected exception",e);
-                } finally {
-                    if (leader != null) {
-                        leader.shutdown("Forcing shutdown");
-                        setLeader(null);
-                    }
-                    state = ServerState.LOOKING;
-                }
-                break;
-            }
-        }
-        LOG.warn("QuorumPeer main thread exited");
-    }
-
-    public void shutdown() {
-        running = false;
-        if (leader != null) {
-            leader.shutdown("quorum Peer shutdown");
-        }
-        if (follower != null) {
-            follower.shutdown();
-        }
-        cnxnFactory.shutdown();
-        udpSocket.close();
-    }
-
-    long getLastLoggedZxid() {
-        File[] list = dataLogDir.listFiles();
-        if (list == null) {
-            return 0;
-        }
-        long maxLog = -1;
-        long maxSnapShot = 0;
-        for (File f : list) {
-            String name = f.getName();
-            if (name.startsWith("log.")) {
-                long zxid = ZooKeeperServer.getZxidFromName(f.getName(), "log");
-                if (zxid > maxLog) {
-                    maxLog = zxid;
-                }
-            } else if (name.startsWith("snapshot.")) {
-                long zxid = ZooKeeperServer.getZxidFromName(f.getName(),
-                        "snapshot");
-                if (zxid > maxLog) {
-                    maxSnapShot = zxid;
-                }
-            }
-        }
-        if (maxSnapShot > maxLog) {
-            return maxSnapShot;
-        }
-        long zxid = maxLog;
-        FileInputStream logStream = null;
-        try {
-            logStream = new FileInputStream(new File(dataLogDir, "log."
-                    + Long.toHexString(maxLog)));
-            BinaryInputArchive ia = BinaryInputArchive.getArchive(logStream);
-            while (true) {
-                byte[] bytes = ia.readBuffer("txnEntry");
-                if (bytes.length == 0) {
-                    // Since we preallocate, we define EOF to be an
-                    // empty transaction
-                    break;
-                }
-                int B = ia.readByte("EOR");
-                if (B != 'B') {
-                    break;
-                }
-                InputArchive bia = BinaryInputArchive
-                        .getArchive(new ByteArrayInputStream(bytes));
-                TxnHeader hdr = new TxnHeader();
-                hdr.deserialize(bia, "hdr");
-                zxid = hdr.getZxid();
-            }
-        } catch (IOException e) {
-            LOG.warn("Unexpected exception", e);
-        } finally {
-            try {
-                if (logStream != null) {
-                    logStream.close();
-                }
-            } catch (IOException e) {
-                LOG.warn("Unexpected exception",e);
-            }
-        }
-        return zxid;
-    }
-
-    public static void runPeer(QuorumPeer.Factory qpFactory) {
-        try {
-            QuorumStats.registerAsConcrete();
-            QuorumPeer self = qpFactory.create(qpFactory.createConnectionFactory());
-            self.start();
-            self.join();
-        } catch (Exception e) {
-            LOG.fatal("Unexpected exception",e);
-        }
-        System.exit(2);
-    }
-
-    public String[] getQuorumPeers() {
-        List<String> l = new ArrayList<String>();
-        synchronized (this) {
-            if (leader != null) {
-                synchronized (leader.followers) {
-                    for (FollowerHandler fh : leader.followers) {
-                        if (fh.s == null)
-                            continue;
-                        String s = fh.s.getRemoteSocketAddress().toString();
-                        if (leader.isFollowerSynced(fh))
-                            s += "*";
-                        l.add(s);
-                    }
-                }
-            } else if (follower != null) {
-                l.add(follower.sock.getRemoteSocketAddress().toString());
-            }
-        }
-        return l.toArray(new String[0]);
-    }
-
-    public String getServerState() {
-        switch (state) {
-        case LOOKING:
-            return QuorumStats.Provider.LOOKING_STATE;
-        case LEADING:
-            return QuorumStats.Provider.LEADING_STATE;
-        case FOLLOWING:
-            return QuorumStats.Provider.FOLLOWING_STATE;
-        }
-        return QuorumStats.Provider.UNKNOWN_STATE;
-    }
-
-    public static void main(String args[]) {
-        if (args.length == 2) {
-            ZooKeeperServer.main(args);
-            return;
-        }
-        QuorumPeerConfig.parse(args);
-
-        if (!QuorumPeerConfig.isStandalone()) {
-            runPeer(new QuorumPeer.Factory() {
-                public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
-                        throws IOException {
-                    return new QuorumPeer(cnxnFactory);
-                }
-                public NIOServerCnxn.Factory createConnectionFactory()
-                        throws IOException {
-                    return new NIOServerCnxn.Factory(getClientPort());
-                }
-            });
-        }else{
-            // there is only server in the quorum -- run as standalone
-            ZooKeeperServer.main(args);
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+
+import static org.apache.zookeeper.server.ServerConfig.getClientPort;
+import static org.apache.zookeeper.server.ServerConfig.getDataDir;
+import static org.apache.zookeeper.server.ServerConfig.getDataLogDir;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getSyncLimit;
+import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getTickTime;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This class manages the quorum protocol. There are three states this server
+ * can be in:
+ * <ol>
+ * <li>Leader election - each server will elect a leader (proposing itself as a
+ * leader initially).</li>
+ * <li>Follower - the server will synchronize with the leader and replicate any
+ * transactions.</li>
+ * <li>Leader - the server will process requests and forward them to followers.
+ * A majority of followers must log the request before it can be accepted.
+ * </ol>
+ *
+ * This class will setup a datagram socket that will always respond with its
+ * view of the current leader. The response will take the form of:
+ *
+ * <pre>
+ * int xid;
+ *
+ * long myid;
+ *
+ * long leader_id;
+ *
+ * long leader_zxid;
+ * </pre>
+ *
+ * The request for the current leader will consist solely of an xid: int xid;
+ *
+ * <h2>Configuration file</h2>
+ *
+ * When the main() method of this class is used to start the program, the file
+ * "zoo.cfg" in the current directory will be used to obtain configuration
+ * information. zoo.cfg is a Properties file, so keys and values are separated
+ * by equals (=) and the key/value pairs are separated by new lines. The
+ * following keys are used in the configuration file:
+ * <ol>
+ * <li>dataDir - The directory where the zookeeper data is stored.</li>
+ * <li>clientPort - The port used to communicate with clients.</li>
+ * <li>tickTime - The duration of a tick in milliseconds. This is the basic
+ * unit of time in zookeeper.</li>
+ * <li>initLimit - The maximum number of ticks that a follower will wait to
+ * initially synchronize with a leader.</li>
+ * <li>syncLimit - The maximum number of ticks that a follower will wait for a
+ * message (including heartbeats) from the leader.</li>
+ * <li>server.<i>id</i> - This is the host:port that the server with the
+ * given id will use for the quorum protocol.</li>
+ * </ol>
+ * In addition to the zoo.cfg file. There is a file in the data directory called
+ * "myid" that contains the server id as an ASCII decimal value.
+ */
+public class QuorumPeer extends Thread implements QuorumStats.Provider {
+    private static final Logger LOG = Logger.getLogger(QuorumPeer.class);
+
+    /**
+     * Create an instance of a quorum peer
+     */
+    public interface Factory{
+        public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException;
+        public NIOServerCnxn.Factory createConnectionFactory() throws IOException;
+    }
+
+    public static class QuorumServer {
+        public QuorumServer(long id, InetSocketAddress addr) {
+            this.id = id;
+            this.addr = addr;
+        }
+
+        public InetSocketAddress addr;
+
+        public long id;
+    }
+
+    public enum ServerState {
+        LOOKING, FOLLOWING, LEADING;
+    }
+    /**
+     * The servers that make up the cluster
+     */
+    ArrayList<QuorumServer> quorumPeers;
+    public int getQuorumSize(){
+        return quorumPeers.size();
+    }
+    /**
+     * My id
+     */
+    private long myid;
+
+
+    /**
+     * get the id of this quorum peer.
+     */
+    public long getId() {
+        return myid;
+    }
+
+    /**
+     * This is who I think the leader currently is.
+     */
+    volatile Vote currentVote;
+
+    volatile boolean running = true;
+
+    /**
+     * The number of milliseconds of each tick
+     */
+    int tickTime;
+
+    /**
+     * The number of ticks that the initial synchronization phase can take
+     */
+    int initLimit;
+
+    /**
+     * The number of ticks that can pass between sending a request and getting
+     * an acknowledgement
+     */
+    int syncLimit;
+
+    /**
+     * The current tick
+     */
+    int tick;
+
+    /**
+     * This class simply responds to requests for the current leader of this
+     * node.
+     * <p>
+     * The request contains just an xid generated by the requestor.
+     * <p>
+     * The response has the xid, the id of this server, the id of the leader,
+     * and the zxid of the leader.
+     *
+     * @author breed
+     *
+     */
+    class ResponderThread extends Thread {
+        ResponderThread() {
+            super("ResponderThread");
+        }
+
+        public void run() {
+            try {
+                byte b[] = new byte[36];
+                ByteBuffer responseBuffer = ByteBuffer.wrap(b);
+                DatagramPacket packet = new DatagramPacket(b, b.length);
+                while (true) {
+                    udpSocket.receive(packet);
+                    if (packet.getLength() != 4) {
+                        LOG.warn("Got more than just an xid! Len = "
+                                + packet.getLength());
+                    } else {
+                        responseBuffer.clear();
+                        responseBuffer.getInt(); // Skip the xid
+                        responseBuffer.putLong(myid);
+                        switch (state) {
+                        case LOOKING:
+                            responseBuffer.putLong(currentVote.id);
+                            responseBuffer.putLong(currentVote.zxid);
+                            break;
+                        case LEADING:
+                            responseBuffer.putLong(myid);
+                            try {
+                                responseBuffer.putLong(leader.lastProposed);
+                            } catch (NullPointerException npe) {
+                                // This can happen in state transitions,
+                                // just ignore the request
+                            }
+                            break;
+                        case FOLLOWING:
+                            responseBuffer.putLong(currentVote.id);
+                            try {
+                                responseBuffer.putLong(follower.getZxid());
+                            } catch (NullPointerException npe) {
+                                // This can happen in state transitions,
+                                // just ignore the request
+                            }
+                        }
+                        packet.setData(b);
+                        udpSocket.send(packet);
+                    }
+                    packet.setLength(b.length);
+                }
+            } catch (Exception e) {
+                LOG.warn("Unexpected exception",e);
+            } finally {
+                LOG.warn("QuorumPeer responder thread exited");
+            }
+        }
+    }
+
+    private ServerState state = ServerState.LOOKING;
+
+    public void setPeerState(ServerState newState){
+        state=newState;
+    }
+
+    public ServerState getPeerState(){
+        return state;
+    }
+
+    DatagramSocket udpSocket;
+
+    private InetSocketAddress myQuorumAddr;
+
+    public InetSocketAddress getQuorumAddress(){
+        return myQuorumAddr;
+    }
+
+    /**
+     * the directory where the snapshot is stored.
+     */
+    private File dataDir;
+
+    /**
+     * the directory where the logs are stored.
+     */
+    private File dataLogDir;
+
+    Election electionAlg;
+
+    int electionPort;
+
+    NIOServerCnxn.Factory cnxnFactory;
+
+    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
+            File dataLogDir, int electionAlg, int electionPort,long myid, int tickTime,
+            int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super("QuorumPeer");
+        this.cnxnFactory = cnxnFactory;
+        this.quorumPeers = quorumPeers;
+        this.dataDir = dataDir;
+        this.electionPort = electionPort;
+        this.dataLogDir = dataLogDir;
+        this.myid = myid;
+        this.tickTime = tickTime;
+        this.initLimit = initLimit;
+        this.syncLimit = syncLimit;
+        currentVote = new Vote(myid, getLastLoggedZxid());
+        for (QuorumServer p : quorumPeers) {
+            if (p.id == myid) {
+                myQuorumAddr = p.addr;
+                break;
+            }
+        }
+        if (myQuorumAddr == null) {
+            throw new SocketException("My id " + myid + " not in the peer list");
+        }
+        if (electionAlg == 0) {
+            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
+            new ResponderThread().start();
+        }
+        this.electionAlg = createElectionAlgorithm(electionAlg);
+        QuorumStats.getInstance().setStatsProvider(this);
+    }
+
+    /**
+     * This constructor is only used by the existing unit test code.
+     */
+    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
+            File dataLogDir, int clientPort, int electionAlg, int electionPort,
+            long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
+        this(quorumPeers,dataDir,dataLogDir,electionAlg,electionPort,myid,tickTime,
+                initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
+    }
+    /**
+     *  The constructor uses the quorum peer config to instantiate the class
+     */
+    public QuorumPeer(NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        this(getServers(), new File(getDataDir()), new File(getDataLogDir()),
+                getElectionAlg(), getElectionPort(),getServerId(),getTickTime(),
+                getInitLimit(), getSyncLimit(),cnxnFactory);
+    }
+
+    public Follower follower;
+    public Leader leader;
+
+    protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
+        return new Follower(this, new FollowerZooKeeperServer(dataDir,
+                dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder()));
+    }
+
+    protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
+        return new Leader(this, new LeaderZooKeeperServer(dataDir, dataLogDir,
+                this,new ZooKeeperServer.BasicDataTreeBuilder()));
+    }
+
+    private Election createElectionAlgorithm(int electionAlgorithm){
+        Election le=null;
+        //TODO: use a factory rather than a switch
+        switch (electionAlgorithm) {
+        case 0:
+            // will create a new instance for each run of the protocol
+            break;
+        case 1:
+            le = new AuthFastLeaderElection(this, this.electionPort);
+            break;
+        case 2:
+            le = new AuthFastLeaderElection(this, this.electionPort, true);
+            break;
+        case 3:
+            le = new FastLeaderElection(this,
+                        new QuorumCnxManager(this.electionPort));
+        default:
+            assert false;
+        }
+        return le;
+    }
+
+    protected Election makeLEStrategy(){
+        if(electionAlg==null)
+            return new LeaderElection(this);
+        return electionAlg;
+    }
+
+    synchronized protected void setLeader(Leader newLeader){
+        leader=newLeader;
+    }
+
+    synchronized protected void setFollower(Follower newFollower){
+        follower=newFollower;
+    }
+
+    synchronized public ZooKeeperServer getActiveServer(){
+        if(leader!=null)
+            return leader.zk;
+        else if(follower!=null)
+            return follower.zk;
+        return null;
+    }
+
+    public void run() {
+        /*
+         * Main loop
+         */
+        while (running) {
+            switch (state) {
+            case LOOKING:
+                try {
+                    LOG.info("LOOKING");
+                    currentVote = makeLEStrategy().lookForLeader();
+                } catch (Exception e) {
+                    LOG.warn("Unexpected exception",e);
+                    state = ServerState.LOOKING;
+                }
+                break;
+            case FOLLOWING:
+                try {
+                    LOG.info("FOLLOWING");
+                    setFollower(makeFollower(dataDir,dataLogDir));
+                    follower.followLeader();
+                } catch (Exception e) {
+                    LOG.warn("Unexpected exception",e);
+                } finally {
+                    follower.shutdown();
+                    setFollower(null);
+                    state = ServerState.LOOKING;
+                }
+                break;
+            case LEADING:
+                LOG.info("LEADING");
+                try {
+                    setLeader(makeLeader(dataDir,dataLogDir));
+                    leader.lead();
+                    setLeader(null);
+                } catch (Exception e) {
+                    LOG.warn("Unexpected exception",e);
+                } finally {
+                    if (leader != null) {
+                        leader.shutdown("Forcing shutdown");
+                        setLeader(null);
+                    }
+                    state = ServerState.LOOKING;
+                }
+                break;
+            }
+        }
+        LOG.warn("QuorumPeer main thread exited");
+    }
+
+    public void shutdown() {
+        running = false;
+        if (leader != null) {
+            leader.shutdown("quorum Peer shutdown");
+        }
+        if (follower != null) {
+            follower.shutdown();
+        }
+        cnxnFactory.shutdown();
+        udpSocket.close();
+    }
+
+    long getLastLoggedZxid() {
+        File[] list = dataLogDir.listFiles();
+        if (list == null) {
+            return 0;
+        }
+        long maxLog = -1;
+        long maxSnapShot = 0;
+        for (File f : list) {
+            String name = f.getName();
+            if (name.startsWith("log.")) {
+                long zxid = ZooKeeperServer.getZxidFromName(f.getName(), "log");
+                if (zxid > maxLog) {
+                    maxLog = zxid;
+                }
+            } else if (name.startsWith("snapshot.")) {
+                long zxid = ZooKeeperServer.getZxidFromName(f.getName(),
+                        "snapshot");
+                if (zxid > maxLog) {
+                    maxSnapShot = zxid;
+                }
+            }
+        }
+        if (maxSnapShot > maxLog) {
+            return maxSnapShot;
+        }
+        long zxid = maxLog;
+        FileInputStream logStream = null;
+        try {
+            logStream = new FileInputStream(new File(dataLogDir, "log."
+                    + Long.toHexString(maxLog)));
+            BinaryInputArchive ia = BinaryInputArchive.getArchive(logStream);
+            while (true) {
+                byte[] bytes = ia.readBuffer("txnEntry");
+                if (bytes.length == 0) {
+                    // Since we preallocate, we define EOF to be an
+                    // empty transaction
+                    break;
+                }
+                int B = ia.readByte("EOR");
+                if (B != 'B') {
+                    break;
+                }
+                InputArchive bia = BinaryInputArchive
+                        .getArchive(new ByteArrayInputStream(bytes));
+                TxnHeader hdr = new TxnHeader();
+                hdr.deserialize(bia, "hdr");
+                zxid = hdr.getZxid();
+            }
+        } catch (IOException e) {
+            LOG.warn("Unexpected exception", e);
+        } finally {
+            try {
+                if (logStream != null) {
+                    logStream.close();
+                }
+            } catch (IOException e) {
+                LOG.warn("Unexpected exception",e);
+            }
+        }
+        return zxid;
+    }
+
+    public static void runPeer(QuorumPeer.Factory qpFactory) {
+        try {
+            QuorumStats.registerAsConcrete();
+            QuorumPeer self = qpFactory.create(qpFactory.createConnectionFactory());
+            self.start();
+            self.join();
+        } catch (Exception e) {
+            LOG.fatal("Unexpected exception",e);
+        }
+        System.exit(2);
+    }
+
+    public String[] getQuorumPeers() {
+        List<String> l = new ArrayList<String>();
+        synchronized (this) {
+            if (leader != null) {
+                synchronized (leader.followers) {
+                    for (FollowerHandler fh : leader.followers) {
+                        if (fh.s == null)
+                            continue;
+                        String s = fh.s.getRemoteSocketAddress().toString();
+                        if (leader.isFollowerSynced(fh))
+                            s += "*";
+                        l.add(s);
+                    }
+                }
+            } else if (follower != null) {
+                l.add(follower.sock.getRemoteSocketAddress().toString());
+            }
+        }
+        return l.toArray(new String[0]);
+    }
+
+    public String getServerState() {
+        switch (state) {
+        case LOOKING:
+            return QuorumStats.Provider.LOOKING_STATE;
+        case LEADING:
+            return QuorumStats.Provider.LEADING_STATE;
+        case FOLLOWING:
+            return QuorumStats.Provider.FOLLOWING_STATE;
+        }
+        return QuorumStats.Provider.UNKNOWN_STATE;
+    }
+
+    public static void main(String args[]) {
+        if (args.length == 2) {
+            ZooKeeperServer.main(args);
+            return;
+        }
+        QuorumPeerConfig.parse(args);
+
+        if (!QuorumPeerConfig.isStandalone()) {
+            runPeer(new QuorumPeer.Factory() {
+                public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
+                        throws IOException {
+                    return new QuorumPeer(cnxnFactory);
+                }
+                public NIOServerCnxn.Factory createConnectionFactory()
+                        throws IOException {
+                    return new NIOServerCnxn.Factory(getClientPort());
+                }
+            });
+        }else{
+            // there is only server in the quorum -- run as standalone
+            ZooKeeperServer.main(args);
+        }
+    }
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Tue Jun 24 12:04:58 2008
@@ -1,210 +1,210 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.util.Map.Entry;
-
-import org.apache.log4j.Logger;
-
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-
-public class QuorumPeerConfig extends ServerConfig {
-    private static final Logger LOG = Logger.getLogger(QuorumPeerConfig.class);
-
-    private int tickTime;
-    private int initLimit;
-    private int syncLimit;
-    private int electionAlg;
-    private int electionPort;
-    private ArrayList<QuorumServer> servers = null;
-    private long serverId;
-
-    private QuorumPeerConfig(int port, String dataDir, String dataLogDir) {
-        super(port, dataDir, dataLogDir);
-    }
-
-    public static void parse(String[] args) {
-        if(instance!=null)
-            return;
-
-        try {
-            if (args.length != 1) {
-                System.err.println("USAGE: configFile");
-                System.exit(2);
-            }
-            File zooCfgFile = new File(args[0]);
-            if (!zooCfgFile.exists()) {
-                LOG.error(zooCfgFile.toString() + " file is missing");
-                System.exit(2);
-            }
-            Properties cfg = new Properties();
-            cfg.load(new FileInputStream(zooCfgFile));
-            ArrayList<QuorumServer> servers = new ArrayList<QuorumServer>();
-            String dataDir = null;
-            String dataLogDir = null;
-            int clientPort = 0;
-            int tickTime = 0;
-            int initLimit = 0;
-            int syncLimit = 0;
-            int electionAlg = 0;
-            int electionPort = 0;
-            for (Entry<Object, Object> entry : cfg.entrySet()) {
-                String key = entry.getKey().toString();
-                String value = entry.getValue().toString();
-                if (key.equals("dataDir")) {
-                    dataDir = value;
-                } else if (key.equals("dataLogDir")) {
-                    dataLogDir = value;
-                } else if (key.equals("clientPort")) {
-                    clientPort = Integer.parseInt(value);
-                } else if (key.equals("tickTime")) {
-                    tickTime = Integer.parseInt(value);
-                } else if (key.equals("initLimit")) {
-                    initLimit = Integer.parseInt(value);
-                } else if (key.equals("syncLimit")) {
-                    syncLimit = Integer.parseInt(value);
-                } else if (key.equals("electionAlg")) {
-                    electionAlg = Integer.parseInt(value);
-                } else if (key.equals("electionPort")) {
-                    electionPort = Integer.parseInt(value);
-                } else if (key.startsWith("server.")) {
-                    int dot = key.indexOf('.');
-                    long sid = Long.parseLong(key.substring(dot + 1));
-                    String parts[] = value.split(":");
-                    if (parts.length != 2) {
-                        LOG.error(value
-                                + " does not have the form host:port");
-                    }
-                    InetSocketAddress addr = new InetSocketAddress(parts[0],
-                            Integer.parseInt(parts[1]));
-                    servers.add(new QuorumServer(sid, addr));
-                } else {
-                    System.setProperty("zookeeper." + key, value);
-                }
-            }
-            if (dataDir == null) {
-                LOG.error("dataDir is not set");
-                System.exit(2);
-            }
-            if (dataLogDir == null) {
-                dataLogDir = dataDir;
-            } else {
-                if (!new File(dataLogDir).isDirectory()) {
-                    LOG.error("dataLogDir " + dataLogDir+ " is missing.");
-                    System.exit(2);
-                }
-            }
-            if (clientPort == 0) {
-                LOG.error("clientPort is not set");
-                System.exit(2);
-            }
-            if (tickTime == 0) {
-                LOG.error("tickTime is not set");
-                System.exit(2);
-            }
-            if (servers.size() > 1 && initLimit == 0) {
-                LOG.error("initLimit is not set");
-                System.exit(2);
-            }
-            if (servers.size() > 1 && syncLimit == 0) {
-                LOG.error("syncLimit is not set");
-                System.exit(2);
-            }
-            QuorumPeerConfig conf = new QuorumPeerConfig(clientPort, dataDir,
-                    dataLogDir);
-            conf.tickTime = tickTime;
-            conf.initLimit = initLimit;
-            conf.syncLimit = syncLimit;
-            conf.electionAlg = electionAlg;
-            conf.electionPort = electionPort;
-            conf.servers = servers;
-            if (servers.size() > 1) {
-                File myIdFile = new File(dataDir, "myid");
-                if (!myIdFile.exists()) {
-                    LOG.error(myIdFile.toString() + " file is missing");
-                    System.exit(2);
-                }
-                BufferedReader br = new BufferedReader(new FileReader(myIdFile));
-                String myIdString = br.readLine();
-                try {
-                    conf.serverId = Long.parseLong(myIdString);
-                } catch (NumberFormatException e) {
-                    LOG.error(myIdString + " is not a number");
-                    System.exit(2);
-                }
-            }
-            instance=conf;
-        } catch (Exception e) {
-            LOG.error("FIXMSG",e);
-            System.exit(2);
-        }
-    }
-
-    protected boolean isStandaloneServer(){
-        return QuorumPeerConfig.getServers().size() <= 1;
-    }
-
-    public static int getTickTime() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).tickTime;
-    }
-
-    public static int getInitLimit() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).initLimit;
-    }
-
-    public static int getSyncLimit() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).syncLimit;
-    }
-
-    public static int getElectionAlg() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).electionAlg;
-    }
-
-    public static int getElectionPort() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).electionPort;
-    }
-
-    public static ArrayList<QuorumServer> getServers() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).servers;
-    }
-
-    public static int getQuorumSize(){
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).servers.size();
-    }
-
-    public static long getServerId() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).serverId;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+public class QuorumPeerConfig extends ServerConfig {
+    private static final Logger LOG = Logger.getLogger(QuorumPeerConfig.class);
+
+    private int tickTime;
+    private int initLimit;
+    private int syncLimit;
+    private int electionAlg;
+    private int electionPort;
+    private ArrayList<QuorumServer> servers = null;
+    private long serverId;
+
+    private QuorumPeerConfig(int port, String dataDir, String dataLogDir) {
+        super(port, dataDir, dataLogDir);
+    }
+
+    public static void parse(String[] args) {
+        if(instance!=null)
+            return;
+
+        try {
+            if (args.length != 1) {
+                System.err.println("USAGE: configFile");
+                System.exit(2);
+            }
+            File zooCfgFile = new File(args[0]);
+            if (!zooCfgFile.exists()) {
+                LOG.error(zooCfgFile.toString() + " file is missing");
+                System.exit(2);
+            }
+            Properties cfg = new Properties();
+            cfg.load(new FileInputStream(zooCfgFile));
+            ArrayList<QuorumServer> servers = new ArrayList<QuorumServer>();
+            String dataDir = null;
+            String dataLogDir = null;
+            int clientPort = 0;
+            int tickTime = 0;
+            int initLimit = 0;
+            int syncLimit = 0;
+            int electionAlg = 0;
+            int electionPort = 0;
+            for (Entry<Object, Object> entry : cfg.entrySet()) {
+                String key = entry.getKey().toString();
+                String value = entry.getValue().toString();
+                if (key.equals("dataDir")) {
+                    dataDir = value;
+                } else if (key.equals("dataLogDir")) {
+                    dataLogDir = value;
+                } else if (key.equals("clientPort")) {
+                    clientPort = Integer.parseInt(value);
+                } else if (key.equals("tickTime")) {
+                    tickTime = Integer.parseInt(value);
+                } else if (key.equals("initLimit")) {
+                    initLimit = Integer.parseInt(value);
+                } else if (key.equals("syncLimit")) {
+                    syncLimit = Integer.parseInt(value);
+                } else if (key.equals("electionAlg")) {
+                    electionAlg = Integer.parseInt(value);
+                } else if (key.equals("electionPort")) {
+                    electionPort = Integer.parseInt(value);
+                } else if (key.startsWith("server.")) {
+                    int dot = key.indexOf('.');
+                    long sid = Long.parseLong(key.substring(dot + 1));
+                    String parts[] = value.split(":");
+                    if (parts.length != 2) {
+                        LOG.error(value
+                                + " does not have the form host:port");
+                    }
+                    InetSocketAddress addr = new InetSocketAddress(parts[0],
+                            Integer.parseInt(parts[1]));
+                    servers.add(new QuorumServer(sid, addr));
+                } else {
+                    System.setProperty("zookeeper." + key, value);
+                }
+            }
+            if (dataDir == null) {
+                LOG.error("dataDir is not set");
+                System.exit(2);
+            }
+            if (dataLogDir == null) {
+                dataLogDir = dataDir;
+            } else {
+                if (!new File(dataLogDir).isDirectory()) {
+                    LOG.error("dataLogDir " + dataLogDir+ " is missing.");
+                    System.exit(2);
+                }
+            }
+            if (clientPort == 0) {
+                LOG.error("clientPort is not set");
+                System.exit(2);
+            }
+            if (tickTime == 0) {
+                LOG.error("tickTime is not set");
+                System.exit(2);
+            }
+            if (servers.size() > 1 && initLimit == 0) {
+                LOG.error("initLimit is not set");
+                System.exit(2);
+            }
+            if (servers.size() > 1 && syncLimit == 0) {
+                LOG.error("syncLimit is not set");
+                System.exit(2);
+            }
+            QuorumPeerConfig conf = new QuorumPeerConfig(clientPort, dataDir,
+                    dataLogDir);
+            conf.tickTime = tickTime;
+            conf.initLimit = initLimit;
+            conf.syncLimit = syncLimit;
+            conf.electionAlg = electionAlg;
+            conf.electionPort = electionPort;
+            conf.servers = servers;
+            if (servers.size() > 1) {
+                File myIdFile = new File(dataDir, "myid");
+                if (!myIdFile.exists()) {
+                    LOG.error(myIdFile.toString() + " file is missing");
+                    System.exit(2);
+                }
+                BufferedReader br = new BufferedReader(new FileReader(myIdFile));
+                String myIdString = br.readLine();
+                try {
+                    conf.serverId = Long.parseLong(myIdString);
+                } catch (NumberFormatException e) {
+                    LOG.error(myIdString + " is not a number");
+                    System.exit(2);
+                }
+            }
+            instance=conf;
+        } catch (Exception e) {
+            LOG.error("FIXMSG",e);
+            System.exit(2);
+        }
+    }
+
+    protected boolean isStandaloneServer(){
+        return QuorumPeerConfig.getServers().size() <= 1;
+    }
+
+    public static int getTickTime() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).tickTime;
+    }
+
+    public static int getInitLimit() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).initLimit;
+    }
+
+    public static int getSyncLimit() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).syncLimit;
+    }
+
+    public static int getElectionAlg() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).electionAlg;
+    }
+
+    public static int getElectionPort() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).electionPort;
+    }
+
+    public static ArrayList<QuorumServer> getServers() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).servers;
+    }
+
+    public static int getQuorumSize(){
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).servers.size();
+    }
+
+    public static long getServerId() {
+        assert instance instanceof QuorumPeerConfig;
+        return ((QuorumPeerConfig)instance).serverId;
+    }
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumStats.java Tue Jun 24 12:04:58 2008
@@ -1,20 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.zookeeper.server.quorum;
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/Profiler.java Tue Jun 24 12:04:58 2008
@@ -1,40 +1,40 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.util;
-
-import org.apache.log4j.Logger;
-
-public class Profiler {
-    private static final Logger LOG = Logger.getLogger(Profiler.class);
-
-    public interface Operation<T> {
-        public T execute() throws Exception;
-    }
-
-    public static <T> T profile(Operation<T> op, long timeout, String message)
-            throws Exception {
-        long start = System.currentTimeMillis();
-        T res = op.execute();
-        long end = System.currentTimeMillis();
-        if (end - start > timeout) {
-            LOG.warn("Elapsed "+(end - start) + " ms: " + message);
-        }
-        return res;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import org.apache.log4j.Logger;
+
+public class Profiler {
+    private static final Logger LOG = Logger.getLogger(Profiler.class);
+
+    public interface Operation<T> {
+        public T execute() throws Exception;
+    }
+
+    public static <T> T profile(Operation<T> op, long timeout, String message)
+            throws Exception {
+        long start = System.currentTimeMillis();
+        T res = op.execute();
+        long end = System.currentTimeMillis();
+        if (end - start > timeout) {
+            LOG.warn("Elapsed "+(end - start) + " ms: " + message);
+        }
+        return res;
+    }
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/version/util/VerGen.java Tue Jun 24 12:04:58 2008
@@ -1,20 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.zookeeper.version.util;
 

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java Tue Jun 24 12:04:58 2008
@@ -1,70 +1,70 @@
-package org.apache.zookeeper.server;
-
-import java.io.File;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-public class ZooKeeperServerTest extends TestCase {
-    public void testSortDataDirAscending() {
-        File[] files = new File[5];
-        
-        files[0] = new File("foo.10027c6de");
-        files[1] = new File("foo.10027c6df");
-        files[2] = new File("bar.10027c6dd");
-        files[3] = new File("foo.10027c6dc");
-        files[4] = new File("foo.20027c6dc");
-        
-        File[] orig = files.clone();
-
-        List<File> filelist = ZooKeeperServer.sortDataDir(files, "foo", true);
-        
-        assertEquals(orig[2], filelist.get(0));
-        assertEquals(orig[3], filelist.get(1));
-        assertEquals(orig[0], filelist.get(2));
-        assertEquals(orig[1], filelist.get(3));
-        assertEquals(orig[4], filelist.get(4));
-    }
-
-    public void testSortDataDirDescending() {
-        File[] files = new File[5];
-        
-        files[0] = new File("foo.10027c6de");
-        files[1] = new File("foo.10027c6df");
-        files[2] = new File("bar.10027c6dd");
-        files[3] = new File("foo.10027c6dc");
-        files[4] = new File("foo.20027c6dc");
-
-        File[] orig = files.clone();
-
-        List<File> filelist = ZooKeeperServer.sortDataDir(files, "foo", false);
-        
-        assertEquals(orig[4], filelist.get(0));
-        assertEquals(orig[1], filelist.get(1));
-        assertEquals(orig[0], filelist.get(2));
-        assertEquals(orig[3], filelist.get(3));
-        assertEquals(orig[2], filelist.get(4));
-    }
-    
-    public void testGetLogFiles() {
-        File[] files = new File[5];
-        
-        files[0] = new File("log.10027c6de");
-        files[1] = new File("log.10027c6df");
-        files[2] = new File("snapshot.10027c6dd");
-        files[3] = new File("log.10027c6dc");
-        files[4] = new File("log.20027c6dc");
-
-        File[] orig = files.clone();
-
-        File[] filelist = 
-            ZooKeeperServer.getLogFiles(files,
-                Long.parseLong("10027c6de", 16));
-        
-        assertEquals(3, filelist.length);
-        assertEquals(orig[0], filelist[0]);
-        assertEquals(orig[1], filelist[1]);
-        assertEquals(orig[4], filelist[2]);
-    }
-
-}
+package org.apache.zookeeper.server;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class ZooKeeperServerTest extends TestCase {
+    public void testSortDataDirAscending() {
+        File[] files = new File[5];
+        
+        files[0] = new File("foo.10027c6de");
+        files[1] = new File("foo.10027c6df");
+        files[2] = new File("bar.10027c6dd");
+        files[3] = new File("foo.10027c6dc");
+        files[4] = new File("foo.20027c6dc");
+        
+        File[] orig = files.clone();
+
+        List<File> filelist = ZooKeeperServer.sortDataDir(files, "foo", true);
+        
+        assertEquals(orig[2], filelist.get(0));
+        assertEquals(orig[3], filelist.get(1));
+        assertEquals(orig[0], filelist.get(2));
+        assertEquals(orig[1], filelist.get(3));
+        assertEquals(orig[4], filelist.get(4));
+    }
+
+    public void testSortDataDirDescending() {
+        File[] files = new File[5];
+        
+        files[0] = new File("foo.10027c6de");
+        files[1] = new File("foo.10027c6df");
+        files[2] = new File("bar.10027c6dd");
+        files[3] = new File("foo.10027c6dc");
+        files[4] = new File("foo.20027c6dc");
+
+        File[] orig = files.clone();
+
+        List<File> filelist = ZooKeeperServer.sortDataDir(files, "foo", false);
+        
+        assertEquals(orig[4], filelist.get(0));
+        assertEquals(orig[1], filelist.get(1));
+        assertEquals(orig[0], filelist.get(2));
+        assertEquals(orig[3], filelist.get(3));
+        assertEquals(orig[2], filelist.get(4));
+    }
+    
+    public void testGetLogFiles() {
+        File[] files = new File[5];
+        
+        files[0] = new File("log.10027c6de");
+        files[1] = new File("log.10027c6df");
+        files[2] = new File("snapshot.10027c6dd");
+        files[3] = new File("log.10027c6dc");
+        files[4] = new File("log.20027c6dc");
+
+        File[] orig = files.clone();
+
+        File[] filelist = 
+            ZooKeeperServer.getLogFiles(files,
+                Long.parseLong("10027c6de", 16));
+        
+        assertEquals(3, filelist.length);
+        assertEquals(orig[0], filelist[0]);
+        assertEquals(orig[1], filelist[1]);
+        assertEquals(orig[4], filelist[2]);
+    }
+
+}